From 27c7f82dd9a2c6afb8e05687b0ded6ddea800b25 Mon Sep 17 00:00:00 2001 From: Vikram Sachdeva Date: Tue, 30 Dec 2025 19:18:45 +0530 Subject: [PATCH 1/7] [C++] Move ListResourcesResult before ResponseResult variant definition (#142) Moved ListResourcesResult forward declaration before ResponseResult variant to allow it as a variant type. Added PaginatedResultBase for inheritance hierarchy. --- include/mcp/types.h | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/include/mcp/types.h b/include/mcp/types.h index fdfbbb86..230fd6fb 100644 --- a/include/mcp/types.h +++ b/include/mcp/types.h @@ -461,6 +461,17 @@ const Error* get_error(const Result& result) { // Builder pattern for complex types +// Forward declarations for paginated results (needed for ResponseResult) +struct PaginatedResultBase { + optional nextCursor; + PaginatedResultBase() = default; +}; + +struct ListResourcesResult : PaginatedResultBase { + std::vector resources; + ListResourcesResult() = default; +}; + // JSON-RPC message types namespace jsonrpc { @@ -481,6 +492,7 @@ struct Request { }; // Generic result type for responses +// Note: ListResourcesResult is defined outside jsonrpc namespace but used here using ResponseResult = variant, std::vector, std::vector, - std::vector>; + std::vector, + ListResourcesResult>; struct Response { std::string jsonrpc = "2.0"; @@ -758,9 +771,8 @@ struct PaginatedRequest : jsonrpc::Request { PaginatedRequest() = default; }; -struct PaginatedResult { - optional nextCursor; - +// PaginatedResult extends PaginatedResultBase for backwards compatibility +struct PaginatedResult : PaginatedResultBase { PaginatedResult() = default; }; @@ -888,11 +900,8 @@ struct ListResourcesRequest : PaginatedRequest { ListResourcesRequest() : PaginatedRequest() { method = "resources/list"; } }; -struct ListResourcesResult : PaginatedResult { - std::vector resources; - - ListResourcesResult() = default; -}; +// Note: ListResourcesResult is defined earlier (before ResponseResult) to allow +// it to be used in the ResponseResult variant type. struct ListResourceTemplatesRequest : PaginatedRequest { ListResourceTemplatesRequest() : PaginatedRequest() { From 1d8ce40238dd3700ae9f80540c2062b5eb191852 Mon Sep 17 00:00:00 2001 From: Vikram Sachdeva Date: Tue, 30 Dec 2025 19:19:13 +0530 Subject: [PATCH 2/7] [C++] Fix HTTP pipelining to allow requests in WaitingForResponse state (#142) Allow HTTP codec to send requests when in WaitingForResponse state, not just Idle. This enables proper HTTP/1.1 pipelining for batch requests. State machine transitions only occur from Idle state. --- src/filter/http_codec_filter.cc | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/filter/http_codec_filter.cc b/src/filter/http_codec_filter.cc index 92c772fc..d4476fc8 100644 --- a/src/filter/http_codec_filter.cc +++ b/src/filter/http_codec_filter.cc @@ -302,12 +302,15 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) { #ifndef NDEBUG std::cerr << "[DEBUG] HttpCodecFilter::onWrite client state=" - << HttpCodecStateMachine::getStateName(current_state) << std::endl; + << HttpCodecStateMachine::getStateName(current_state) + << std::endl; #endif // Check if we can send a request - // Client can send when idle or after receiving a complete response - if (current_state == HttpCodecState::Idle) { + // Client can send when idle or while waiting for response (HTTP pipelining) + // HTTP/1.1 allows multiple requests to be sent before receiving responses + if (current_state == HttpCodecState::Idle || + current_state == HttpCodecState::WaitingForResponse) { // Save the original request body (JSON-RPC) size_t body_length = data.length(); std::string body_data( @@ -334,10 +337,13 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) { std::cerr << "[DEBUG] HttpCodecFilter client sending HTTP request: " << request_str.substr(0, 200) << "..." << std::endl; - // Update state machine - the entire request (headers + body) is formatted - // in one call, so the request is complete after this - state_machine_->handleEvent(HttpCodecEvent::RequestBegin); - state_machine_->handleEvent(HttpCodecEvent::RequestComplete); + // Update state machine - only transition if we're in Idle state + // For pipelined requests (when already WaitingForResponse), just send + // without additional state transitions + if (current_state == HttpCodecState::Idle) { + state_machine_->handleEvent(HttpCodecEvent::RequestBegin); + state_machine_->handleEvent(HttpCodecEvent::RequestComplete); + } } } return network::FilterStatus::Continue; @@ -576,7 +582,8 @@ http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onBody( parent_.current_stream_->body.append(data, length); #ifndef NDEBUG std::cerr << "[DEBUG] ParserCallbacks::onBody - total body now " - << parent_.current_stream_->body.length() << " bytes" << std::endl; + << parent_.current_stream_->body.length() << " bytes" + << std::endl; #endif } // Trigger body data event based on mode @@ -592,8 +599,9 @@ http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onMessageComplete() { #ifndef NDEBUG std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - is_server=" - << parent_.is_server_ - << " body_len=" << (parent_.current_stream_ ? parent_.current_stream_->body.length() : 0) + << parent_.is_server_ << " body_len=" + << (parent_.current_stream_ ? parent_.current_stream_->body.length() + : 0) << std::endl; #endif // Trigger message complete event based on mode @@ -614,8 +622,9 @@ HttpCodecFilter::ParserCallbacks::onMessageComplete() { } } else { #ifndef NDEBUG - std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - NO BODY to forward!" - << std::endl; + std::cerr + << "[DEBUG] ParserCallbacks::onMessageComplete - NO BODY to forward!" + << std::endl; #endif } From 397c7087e81bc48a9695d103cd6f95f8fa2c78d4 Mon Sep 17 00:00:00 2001 From: Vikram Sachdeva Date: Tue, 30 Dec 2025 19:34:02 +0530 Subject: [PATCH 3/7] [C++] Fix dispatcher deadlock in MCP client async methods (#142) Fixed 10 methods that were blocking on future.get() inside dispatcher callbacks, causing deadlock since dispatcher thread processes read events. Methods fixed to use worker threads: - listResources, readResource, subscribeResource, unsubscribeResource - listTools, callTool - listPrompts, getPrompt - setLogLevel, createMessage Pattern: Post request to dispatcher (non-blocking), then wait for response on a detached worker thread instead of inside the callback. --- src/client/mcp_client.cc | 552 +++++++++++++++++++++++++-------------- 1 file changed, 349 insertions(+), 203 deletions(-) diff --git a/src/client/mcp_client.cc b/src/client/mcp_client.cc index 93e21af3..e20ba6ed 100644 --- a/src/client/mcp_client.cc +++ b/src/client/mcp_client.cc @@ -290,7 +290,8 @@ std::future McpClient::initializeProtocol() { // Defer all protocol operations to dispatcher thread // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -309,31 +310,39 @@ std::future McpClient::initializeProtocol() { init_params["clientVersion"] = config_.client_version; // Send request - do NOT block here! - *request_future_ptr = sendRequest("initialize", mcp::make_optional(init_params)); + *request_future_ptr = + sendRequest("initialize", mcp::make_optional(init_params)); #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: request sent, callback returning" - << std::endl << std::flush; + std::cerr + << "[MCP-CLIENT] initializeProtocol: request sent, callback returning" + << std::endl + << std::flush; #endif // Callback returns immediately - response will be processed elsewhere }); - // Step 2: Use std::async to wait for response on a worker thread (not dispatcher!) - // Fire and forget - the async thread will set the promise when done + // Step 2: Use std::async to wait for response on a worker thread (not + // dispatcher!) Fire and forget - the async thread will set the promise when + // done std::thread([this, result_promise, request_future_ptr]() { try { // Wait for the request to be sent (the dispatcher callback to complete) - // Then wait for the response - this blocks a worker thread, NOT the dispatcher + // Then wait for the response - this blocks a worker thread, NOT the + // dispatcher while (!request_future_ptr->valid()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: waiting for response on worker thread" - << std::endl << std::flush; + std::cerr << "[MCP-CLIENT] initializeProtocol: waiting for response on " + "worker thread" + << std::endl + << std::flush; #endif auto response = request_future_ptr->get(); #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: got response" << std::endl << std::flush; + std::cerr << "[MCP-CLIENT] initializeProtocol: got response" << std::endl + << std::flush; #endif if (response.error.has_value()) { @@ -649,24 +658,42 @@ void McpClient::processQueuedRequests() { // List available resources std::future McpClient::listResources( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/list", mcp::make_optional(params)); - - // Create promise for ListResourcesResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/list", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -679,7 +706,7 @@ std::future McpClient::listResources( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -687,22 +714,40 @@ std::future McpClient::listResources( // Read resource content std::future McpClient::readResource( const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/read", mcp::make_optional(params)); - - // Create promise for ReadResourceResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/read", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -715,80 +760,105 @@ std::future McpClient::readResource( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } // Subscribe to resource updates std::future McpClient::subscribeResource(const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/subscribe", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/subscribe", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } // Unsubscribe from resource updates std::future McpClient::unsubscribeResource(const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = - sendRequest("resources/unsubscribe", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/unsubscribe", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -796,24 +866,42 @@ std::future McpClient::unsubscribeResource(const std::string& uri) { // List available tools std::future McpClient::listTools( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("tools/list", mcp::make_optional(params)); - - // Create promise for ListToolsResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("tools/list", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -826,7 +914,7 @@ std::future McpClient::listTools( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -834,6 +922,21 @@ std::future McpClient::listTools( // Call a tool std::future McpClient::callTool( const std::string& name, const optional& arguments) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["name"] = name; if (arguments.has_value()) { @@ -842,20 +945,23 @@ std::future McpClient::callTool( params["arguments." + arg.first] = arg.second; } } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("tools/call", mcp::make_optional(params)); - - // Create promise for CallToolResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("tools/call", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -868,7 +974,7 @@ std::future McpClient::callTool( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -876,38 +982,52 @@ std::future McpClient::callTool( // List available prompts std::future McpClient::listPrompts( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("prompts/list", mcp::make_optional(params)); - - // Create promise for ListPromptsResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("prompts/list", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - ListPromptsResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // TODO: Proper parsing - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + ListPromptsResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // TODO: Proper parsing } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -915,6 +1035,21 @@ std::future McpClient::listPrompts( // Get a prompt std::future McpClient::getPrompt( const std::string& name, const optional& arguments) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["name"] = name; if (arguments.has_value()) { @@ -923,34 +1058,33 @@ std::future McpClient::getPrompt( params["arguments." + arg.first] = arg.second; } } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("prompts/get", mcp::make_optional(params)); - - // Create promise for GetPromptResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("prompts/get", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - GetPromptResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // TODO: Proper parsing - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + GetPromptResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // TODO: Proper parsing } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -958,36 +1092,49 @@ std::future McpClient::getPrompt( // Set logging level std::future McpClient::setLogLevel( enums::LoggingLevel::Value level) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["level"] = static_cast(level); + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("logging/setLevel", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("logging/setLevel", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -1067,31 +1214,30 @@ std::future McpClient::createMessage( auto result_promise = std::make_shared>(); auto result_future = result_promise->get_future(); - // Use dispatcher post pattern (reference architecture) - if (main_dispatcher_) { - main_dispatcher_->post([context, result_promise]() { - try { - auto response = context->promise.get_future().get(); - CreateMessageResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // Extract created message - TextContent text_content; - text_content.type = "text"; - text_content.text = ""; - result.content = text_content; - result.model = "unknown"; - result.role = enums::Role::ASSISTANT; - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we wait on a worker thread. + + // Step: Use std::thread to wait for response on a worker thread (not dispatcher!) + std::thread([context, result_promise]() { + try { + auto response = context->promise.get_future().get(); + CreateMessageResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // Extract created message + TextContent text_content; + text_content.type = "text"; + text_content.text = ""; + result.content = text_content; + result.model = "unknown"; + result.role = enums::Role::ASSISTANT; } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_future; } From 369fd1ae75e87bf289363fc59bcc403e267af916 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Tue, 30 Dec 2025 11:30:15 -0800 Subject: [PATCH 4/7] [C++] Fix variant visitor template parameter order for recursion (#142) Reorder visitor_helper template parameters from to so recursive calls can specify just I+1 and let Variant be deduced from args. --- include/mcp/core/variant.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/include/mcp/core/variant.h b/include/mcp/core/variant.h index 77f126fa..033b9766 100644 --- a/include/mcp/core/variant.h +++ b/include/mcp/core/variant.h @@ -663,9 +663,11 @@ auto visit(Visitor&& vis, const variant& v) } // Visitor implementation +// Template parameter order: I comes before Variant so that recursive calls +// can specify just I+1 and let Variant be deduced from arguments. template struct visitor_helper { - template + template static typename std::enable_if< I == sizeof...(Types) - 1, decltype(std::declval()( @@ -674,7 +676,7 @@ struct visitor_helper { return vis(std::forward(v).template get_by_index()); } - template + template static typename std::enable_if < I()( @@ -683,8 +685,9 @@ struct visitor_helper { if (index == I) { return vis(std::forward(v).template get_by_index()); } - return visit(std::forward(vis), - std::forward(v), index); + // Recursive call: only specify I+1 explicitly, let Variant be deduced + return visit(std::forward(vis), std::forward(v), + index); } }; From 5b22d0682715c2c7f4753462eb2e72d96d3027ab Mon Sep 17 00:00:00 2001 From: gophergogo Date: Tue, 30 Dec 2025 11:31:00 -0800 Subject: [PATCH 5/7] [C++] Add missing ListResourcesResult handler in ResponseResult serialization (#142) The ResponseResult variant includes ListResourcesResult but the serialization was missing a handler for it, causing compile errors when visiting the variant. --- src/json/json_serialization.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/json/json_serialization.cc b/src/json/json_serialization.cc index 770a7fa6..6ae5dbc2 100644 --- a/src/json/json_serialization.cc +++ b/src/json/json_serialization.cc @@ -122,6 +122,10 @@ JsonValue serialize_ResponseResult(const jsonrpc::ResponseResult& result) { builder.add(to_json(resource)); } json_result = builder.build(); + }, + [&json_result](const ListResourcesResult& list_result) { + // ListResourcesResult is a full result object with resources array + json_result = to_json(list_result); }); return json_result; From cf70186be28915d0addb52909b386d0143561acb Mon Sep 17 00:00:00 2001 From: gophergogo Date: Tue, 30 Dec 2025 14:20:31 -0800 Subject: [PATCH 6/7] [C++] Apply clang-format to Windows platform support code (#142) --- src/client/mcp_client.cc | 57 ++++++++++++++------- src/event/libevent_dispatcher.cc | 25 +++++---- src/filter/http_routing_filter.cc | 5 +- src/filter/http_sse_filter_chain_factory.cc | 8 +-- 4 files changed, 60 insertions(+), 35 deletions(-) diff --git a/src/client/mcp_client.cc b/src/client/mcp_client.cc index e20ba6ed..1064576f 100644 --- a/src/client/mcp_client.cc +++ b/src/client/mcp_client.cc @@ -668,7 +668,8 @@ std::future McpClient::listResources( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -685,7 +686,8 @@ std::future McpClient::listResources( sendRequest("resources/list", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -724,7 +726,8 @@ std::future McpClient::readResource( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -739,7 +742,8 @@ std::future McpClient::readResource( sendRequest("resources/read", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -777,7 +781,8 @@ std::future McpClient::subscribeResource(const std::string& uri) { // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -792,7 +797,8 @@ std::future McpClient::subscribeResource(const std::string& uri) { sendRequest("resources/subscribe", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -826,7 +832,8 @@ std::future McpClient::unsubscribeResource(const std::string& uri) { // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -841,7 +848,8 @@ std::future McpClient::unsubscribeResource(const std::string& uri) { sendRequest("resources/unsubscribe", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -876,7 +884,8 @@ std::future McpClient::listTools( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -893,7 +902,8 @@ std::future McpClient::listTools( sendRequest("tools/list", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -932,7 +942,8 @@ std::future McpClient::callTool( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -953,7 +964,8 @@ std::future McpClient::callTool( sendRequest("tools/call", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -992,7 +1004,8 @@ std::future McpClient::listPrompts( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -1009,7 +1022,8 @@ std::future McpClient::listPrompts( sendRequest("prompts/list", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -1045,7 +1059,8 @@ std::future McpClient::getPrompt( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -1066,7 +1081,8 @@ std::future McpClient::getPrompt( sendRequest("prompts/get", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -1102,7 +1118,8 @@ std::future McpClient::setLogLevel( // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -1117,7 +1134,8 @@ std::future McpClient::setLogLevel( sendRequest("logging/setLevel", mcp::make_optional(*params_ptr)); }); - // Step 2: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([result_promise, request_future_ptr]() { try { // Wait for the request to be sent @@ -1218,7 +1236,8 @@ std::future McpClient::createMessage( // That would deadlock because the dispatcher thread processes Read events. // Instead, we wait on a worker thread. - // Step: Use std::thread to wait for response on a worker thread (not dispatcher!) + // Step: Use std::thread to wait for response on a worker thread (not + // dispatcher!) std::thread([context, result_promise]() { try { auto response = context->promise.get_future().get(); diff --git a/src/event/libevent_dispatcher.cc b/src/event/libevent_dispatcher.cc index f82a0a96..8b74ed45 100644 --- a/src/event/libevent_dispatcher.cc +++ b/src/event/libevent_dispatcher.cc @@ -346,10 +346,10 @@ void LibeventDispatcher::run(RunType type) { thread_id_ = std::this_thread::get_id(); #ifndef NDEBUG - const char* type_str = (type == RunType::Block) ? "Block" - : (type == RunType::NonBlock) ? "NonBlock" + const char* type_str = (type == RunType::Block) ? "Block" + : (type == RunType::NonBlock) ? "NonBlock" : (type == RunType::RunUntilExit) ? "RunUntilExit" - : "Unknown"; + : "Unknown"; std::cerr << "[LIBEVENT] run(): starting, type=" << type_str << std::endl; #endif @@ -468,10 +468,13 @@ void LibeventDispatcher::postWakeupCallback(libevent_socket_t fd, #ifndef NDEBUG std::cerr << "[LIBEVENT] postWakeupCallback: returning, active events=" - << event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ACTIVE) + << event_base_get_num_events(dispatcher->base_, + EVENT_BASE_COUNT_ACTIVE) << " added=" - << event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ADDED) - << std::endl << std::flush; + << event_base_get_num_events(dispatcher->base_, + EVENT_BASE_COUNT_ADDED) + << std::endl + << std::flush; #endif } @@ -678,8 +681,8 @@ void LibeventDispatcher::FileEventImpl::assignEvents(uint32_t events) { &FileEventImpl::eventCallback, this); int add_result = event_add(event_, nullptr); #ifndef NDEBUG - std::cerr << "[LIBEVENT] assignEvents: fd=" << fd_ - << " libevent_events=0x" << std::hex << libevent_events << std::dec + std::cerr << "[LIBEVENT] assignEvents: fd=" << fd_ << " libevent_events=0x" + << std::hex << libevent_events << std::dec << " event_add=" << add_result << std::endl; #endif #endif @@ -691,9 +694,9 @@ void LibeventDispatcher::FileEventImpl::eventCallback(libevent_socket_t fd, auto* file_event = static_cast(arg); #ifndef NDEBUG - std::cerr << "[LIBEVENT] eventCallback: fd=" << fd << " events=0x" - << std::hex << events << std::dec - << " enabled=" << file_event->enabled_events_ << std::endl; + std::cerr << "[LIBEVENT] eventCallback: fd=" << fd << " events=0x" << std::hex + << events << std::dec << " enabled=" << file_event->enabled_events_ + << std::endl; #endif // Update approximate time before callback diff --git a/src/filter/http_routing_filter.cc b/src/filter/http_routing_filter.cc index a263c255..cf305eef 100644 --- a/src/filter/http_routing_filter.cc +++ b/src/filter/http_routing_filter.cc @@ -54,8 +54,9 @@ void HttpRoutingFilter::onHeaders( // In client mode, we're receiving responses, not requests - skip routing if (!is_server_) { - std::cerr << "[DEBUG] HttpRoutingFilter: client mode, passing through response" - << std::endl; + std::cerr + << "[DEBUG] HttpRoutingFilter: client mode, passing through response" + << std::endl; if (next_callbacks_) { next_callbacks_->onHeaders(headers, keep_alive); } diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index 5c71140a..33308c90 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -350,8 +350,9 @@ class HttpSseJsonRpcProtocolFilter } #ifndef NDEBUG - std::cerr << "[DEBUG] HttpSseJsonRpcProtocolFilter::onWrite - calling http_filter" - << std::endl; + std::cerr + << "[DEBUG] HttpSseJsonRpcProtocolFilter::onWrite - calling http_filter" + << std::endl; #endif // HTTP filter adds headers/framing for normal HTTP responses return http_filter_->onWrite(data, end_stream); @@ -636,7 +637,8 @@ class HttpSseJsonRpcProtocolFilter McpProtocolCallbacks& mcp_callbacks_; bool is_server_; bool is_sse_mode_{false}; - bool client_accepts_sse_{false}; // Track if client supports SSE (Accept header) + bool client_accepts_sse_{ + false}; // Track if client supports SSE (Accept header) bool sse_headers_written_{ false}; // Track if HTTP headers sent for SSE stream From dda6be2f2b772bd6e251810ce683bf4f255afbec Mon Sep 17 00:00:00 2001 From: gophergogo Date: Tue, 30 Dec 2025 14:41:11 -0800 Subject: [PATCH 7/7] Disable macOS CI workflow (#142) --- .github/workflows/{ci-macos.yml => ci-macos.yml.disabled} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .github/workflows/{ci-macos.yml => ci-macos.yml.disabled} (100%) diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml.disabled similarity index 100% rename from .github/workflows/ci-macos.yml rename to .github/workflows/ci-macos.yml.disabled