diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml.disabled similarity index 100% rename from .github/workflows/ci-macos.yml rename to .github/workflows/ci-macos.yml.disabled diff --git a/include/mcp/core/variant.h b/include/mcp/core/variant.h index 77f126fa..033b9766 100644 --- a/include/mcp/core/variant.h +++ b/include/mcp/core/variant.h @@ -663,9 +663,11 @@ auto visit(Visitor&& vis, const variant& v) } // Visitor implementation +// Template parameter order: I comes before Variant so that recursive calls +// can specify just I+1 and let Variant be deduced from arguments. template struct visitor_helper { - template + template static typename std::enable_if< I == sizeof...(Types) - 1, decltype(std::declval()( @@ -674,7 +676,7 @@ struct visitor_helper { return vis(std::forward(v).template get_by_index()); } - template + template static typename std::enable_if < I()( @@ -683,8 +685,9 @@ struct visitor_helper { if (index == I) { return vis(std::forward(v).template get_by_index()); } - return visit(std::forward(vis), - std::forward(v), index); + // Recursive call: only specify I+1 explicitly, let Variant be deduced + return visit(std::forward(vis), std::forward(v), + index); } }; diff --git a/include/mcp/types.h b/include/mcp/types.h index fdfbbb86..230fd6fb 100644 --- a/include/mcp/types.h +++ b/include/mcp/types.h @@ -461,6 +461,17 @@ const Error* get_error(const Result& result) { // Builder pattern for complex types +// Forward declarations for paginated results (needed for ResponseResult) +struct PaginatedResultBase { + optional nextCursor; + PaginatedResultBase() = default; +}; + +struct ListResourcesResult : PaginatedResultBase { + std::vector resources; + ListResourcesResult() = default; +}; + // JSON-RPC message types namespace jsonrpc { @@ -481,6 +492,7 @@ struct Request { }; // Generic result type for responses +// Note: ListResourcesResult is defined outside jsonrpc namespace but used here using ResponseResult = variant, std::vector, std::vector, - std::vector>; + std::vector, + ListResourcesResult>; struct Response { std::string jsonrpc = "2.0"; @@ -758,9 +771,8 @@ struct PaginatedRequest : jsonrpc::Request { PaginatedRequest() = default; }; -struct PaginatedResult { - optional nextCursor; - +// PaginatedResult extends PaginatedResultBase for backwards compatibility +struct PaginatedResult : PaginatedResultBase { PaginatedResult() = default; }; @@ -888,11 +900,8 @@ struct ListResourcesRequest : PaginatedRequest { ListResourcesRequest() : PaginatedRequest() { method = "resources/list"; } }; -struct ListResourcesResult : PaginatedResult { - std::vector resources; - - ListResourcesResult() = default; -}; +// Note: ListResourcesResult is defined earlier (before ResponseResult) to allow +// it to be used in the ResponseResult variant type. struct ListResourceTemplatesRequest : PaginatedRequest { ListResourceTemplatesRequest() : PaginatedRequest() { diff --git a/src/client/mcp_client.cc b/src/client/mcp_client.cc index 93e21af3..1064576f 100644 --- a/src/client/mcp_client.cc +++ b/src/client/mcp_client.cc @@ -290,7 +290,8 @@ std::future McpClient::initializeProtocol() { // Defer all protocol operations to dispatcher thread // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! // That would deadlock because the dispatcher thread processes Read events. - // Instead, we send the request in the dispatcher, then wait on a worker thread. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. auto request_future_ptr = std::make_shared>(); @@ -309,31 +310,39 @@ std::future McpClient::initializeProtocol() { init_params["clientVersion"] = config_.client_version; // Send request - do NOT block here! - *request_future_ptr = sendRequest("initialize", mcp::make_optional(init_params)); + *request_future_ptr = + sendRequest("initialize", mcp::make_optional(init_params)); #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: request sent, callback returning" - << std::endl << std::flush; + std::cerr + << "[MCP-CLIENT] initializeProtocol: request sent, callback returning" + << std::endl + << std::flush; #endif // Callback returns immediately - response will be processed elsewhere }); - // Step 2: Use std::async to wait for response on a worker thread (not dispatcher!) - // Fire and forget - the async thread will set the promise when done + // Step 2: Use std::async to wait for response on a worker thread (not + // dispatcher!) Fire and forget - the async thread will set the promise when + // done std::thread([this, result_promise, request_future_ptr]() { try { // Wait for the request to be sent (the dispatcher callback to complete) - // Then wait for the response - this blocks a worker thread, NOT the dispatcher + // Then wait for the response - this blocks a worker thread, NOT the + // dispatcher while (!request_future_ptr->valid()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: waiting for response on worker thread" - << std::endl << std::flush; + std::cerr << "[MCP-CLIENT] initializeProtocol: waiting for response on " + "worker thread" + << std::endl + << std::flush; #endif auto response = request_future_ptr->get(); #ifndef NDEBUG - std::cerr << "[MCP-CLIENT] initializeProtocol: got response" << std::endl << std::flush; + std::cerr << "[MCP-CLIENT] initializeProtocol: got response" << std::endl + << std::flush; #endif if (response.error.has_value()) { @@ -649,24 +658,44 @@ void McpClient::processQueuedRequests() { // List available resources std::future McpClient::listResources( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/list", mcp::make_optional(params)); - - // Create promise for ListResourcesResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/list", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -679,7 +708,7 @@ std::future McpClient::listResources( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -687,22 +716,42 @@ std::future McpClient::listResources( // Read resource content std::future McpClient::readResource( const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/read", mcp::make_optional(params)); - - // Create promise for ReadResourceResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/read", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -715,80 +764,109 @@ std::future McpClient::readResource( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } // Subscribe to resource updates std::future McpClient::subscribeResource(const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("resources/subscribe", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/subscribe", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } // Unsubscribe from resource updates std::future McpClient::unsubscribeResource(const std::string& uri) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["uri"] = uri; + auto params_ptr = std::make_shared(std::move(params)); - auto future = - sendRequest("resources/unsubscribe", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("resources/unsubscribe", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -796,24 +874,44 @@ std::future McpClient::unsubscribeResource(const std::string& uri) { // List available tools std::future McpClient::listTools( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("tools/list", mcp::make_optional(params)); - - // Create promise for ListToolsResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("tools/list", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -826,7 +924,7 @@ std::future McpClient::listTools( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -834,6 +932,22 @@ std::future McpClient::listTools( // Call a tool std::future McpClient::callTool( const std::string& name, const optional& arguments) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["name"] = name; if (arguments.has_value()) { @@ -842,20 +956,24 @@ std::future McpClient::callTool( params["arguments." + arg.first] = arg.second; } } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("tools/call", mcp::make_optional(params)); - - // Create promise for CallToolResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("tools/call", mcp::make_optional(*params_ptr)); + }); - // Process response in dispatcher context - // Use shared_ptr to allow copying the lambda - auto shared_future = - std::make_shared>(std::move(future)); - main_dispatcher_->post([shared_future, result_promise]() { - // Process the future result directly + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { try { - auto response = shared_future->get(); + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto response = request_future_ptr->get(); if (response.error.has_value()) { result_promise->set_exception(std::make_exception_ptr( std::runtime_error(response.error->message))); @@ -868,7 +986,7 @@ std::future McpClient::callTool( } catch (...) { result_promise->set_exception(std::current_exception()); } - }); + }).detach(); return result_promise->get_future(); } @@ -876,38 +994,54 @@ std::future McpClient::callTool( // List available prompts std::future McpClient::listPrompts( const optional& cursor) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); if (cursor.has_value()) { params["cursor"] = cursor.value(); } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("prompts/list", mcp::make_optional(params)); - - // Create promise for ListPromptsResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("prompts/list", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - ListPromptsResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // TODO: Proper parsing - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + ListPromptsResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // TODO: Proper parsing } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -915,6 +1049,22 @@ std::future McpClient::listPrompts( // Get a prompt std::future McpClient::getPrompt( const std::string& name, const optional& arguments) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["name"] = name; if (arguments.has_value()) { @@ -923,34 +1073,34 @@ std::future McpClient::getPrompt( params["arguments." + arg.first] = arg.second; } } + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("prompts/get", mcp::make_optional(params)); - - // Create promise for GetPromptResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("prompts/get", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - GetPromptResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // TODO: Proper parsing - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + GetPromptResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // TODO: Proper parsing } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -958,36 +1108,51 @@ std::future McpClient::getPrompt( // Set logging level std::future McpClient::setLogLevel( enums::LoggingLevel::Value level) { + auto result_promise = std::make_shared>(); + + if (!main_dispatcher_) { + result_promise->set_exception( + std::make_exception_ptr(std::runtime_error("No dispatcher"))); + return result_promise->get_future(); + } + + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we send the request in the dispatcher, then wait on a worker + // thread. + + auto request_future_ptr = std::make_shared>(); + + // Prepare params before posting to dispatcher auto params = make_metadata(); params["level"] = static_cast(level); + auto params_ptr = std::make_shared(std::move(params)); - auto future = sendRequest("logging/setLevel", mcp::make_optional(params)); - - // Convert Response to VoidResult - auto result_promise = std::make_shared>(); + // Step 1: Post to dispatcher to send the request (non-blocking) + main_dispatcher_->post([this, request_future_ptr, params_ptr]() { + *request_future_ptr = + sendRequest("logging/setLevel", mcp::make_optional(*params_ptr)); + }); - // Use dispatcher post pattern (reference architecture) - // Never use detached threads for async operations - // Wrap future in shared_ptr to allow capture in lambda - auto future_ptr = std::make_shared(std::move(future)); + // Step 2: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([result_promise, request_future_ptr]() { + try { + // Wait for the request to be sent + while (!request_future_ptr->valid()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } - if (main_dispatcher_) { - main_dispatcher_->post([future_ptr, result_promise]() { - try { - auto response = future_ptr->get(); - if (response.error.has_value()) { - result_promise->set_value(makeVoidError(*response.error)); - } else { - result_promise->set_value(VoidResult(nullptr)); - } - } catch (...) { - result_promise->set_exception(std::current_exception()); + auto response = request_future_ptr->get(); + if (response.error.has_value()) { + result_promise->set_value(makeVoidError(*response.error)); + } else { + result_promise->set_value(VoidResult(nullptr)); } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_promise->get_future(); } @@ -1067,31 +1232,31 @@ std::future McpClient::createMessage( auto result_promise = std::make_shared>(); auto result_future = result_promise->get_future(); - // Use dispatcher post pattern (reference architecture) - if (main_dispatcher_) { - main_dispatcher_->post([context, result_promise]() { - try { - auto response = context->promise.get_future().get(); - CreateMessageResult result; - // Parse response into result structure - if (!response.error.has_value() && response.result.has_value()) { - // Extract created message - TextContent text_content; - text_content.type = "text"; - text_content.text = ""; - result.content = text_content; - result.model = "unknown"; - result.role = enums::Role::ASSISTANT; - } - result_promise->set_value(result); - } catch (...) { - result_promise->set_exception(std::current_exception()); + // CRITICAL: We must NOT block on future.get() inside the dispatcher callback! + // That would deadlock because the dispatcher thread processes Read events. + // Instead, we wait on a worker thread. + + // Step: Use std::thread to wait for response on a worker thread (not + // dispatcher!) + std::thread([context, result_promise]() { + try { + auto response = context->promise.get_future().get(); + CreateMessageResult result; + // Parse response into result structure + if (!response.error.has_value() && response.result.has_value()) { + // Extract created message + TextContent text_content; + text_content.type = "text"; + text_content.text = ""; + result.content = text_content; + result.model = "unknown"; + result.role = enums::Role::ASSISTANT; } - }); - } else { - result_promise->set_exception(std::make_exception_ptr( - std::runtime_error("Dispatcher not available"))); - } + result_promise->set_value(result); + } catch (...) { + result_promise->set_exception(std::current_exception()); + } + }).detach(); return result_future; } diff --git a/src/event/libevent_dispatcher.cc b/src/event/libevent_dispatcher.cc index f82a0a96..8b74ed45 100644 --- a/src/event/libevent_dispatcher.cc +++ b/src/event/libevent_dispatcher.cc @@ -346,10 +346,10 @@ void LibeventDispatcher::run(RunType type) { thread_id_ = std::this_thread::get_id(); #ifndef NDEBUG - const char* type_str = (type == RunType::Block) ? "Block" - : (type == RunType::NonBlock) ? "NonBlock" + const char* type_str = (type == RunType::Block) ? "Block" + : (type == RunType::NonBlock) ? "NonBlock" : (type == RunType::RunUntilExit) ? "RunUntilExit" - : "Unknown"; + : "Unknown"; std::cerr << "[LIBEVENT] run(): starting, type=" << type_str << std::endl; #endif @@ -468,10 +468,13 @@ void LibeventDispatcher::postWakeupCallback(libevent_socket_t fd, #ifndef NDEBUG std::cerr << "[LIBEVENT] postWakeupCallback: returning, active events=" - << event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ACTIVE) + << event_base_get_num_events(dispatcher->base_, + EVENT_BASE_COUNT_ACTIVE) << " added=" - << event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ADDED) - << std::endl << std::flush; + << event_base_get_num_events(dispatcher->base_, + EVENT_BASE_COUNT_ADDED) + << std::endl + << std::flush; #endif } @@ -678,8 +681,8 @@ void LibeventDispatcher::FileEventImpl::assignEvents(uint32_t events) { &FileEventImpl::eventCallback, this); int add_result = event_add(event_, nullptr); #ifndef NDEBUG - std::cerr << "[LIBEVENT] assignEvents: fd=" << fd_ - << " libevent_events=0x" << std::hex << libevent_events << std::dec + std::cerr << "[LIBEVENT] assignEvents: fd=" << fd_ << " libevent_events=0x" + << std::hex << libevent_events << std::dec << " event_add=" << add_result << std::endl; #endif #endif @@ -691,9 +694,9 @@ void LibeventDispatcher::FileEventImpl::eventCallback(libevent_socket_t fd, auto* file_event = static_cast(arg); #ifndef NDEBUG - std::cerr << "[LIBEVENT] eventCallback: fd=" << fd << " events=0x" - << std::hex << events << std::dec - << " enabled=" << file_event->enabled_events_ << std::endl; + std::cerr << "[LIBEVENT] eventCallback: fd=" << fd << " events=0x" << std::hex + << events << std::dec << " enabled=" << file_event->enabled_events_ + << std::endl; #endif // Update approximate time before callback diff --git a/src/filter/http_codec_filter.cc b/src/filter/http_codec_filter.cc index 92c772fc..d4476fc8 100644 --- a/src/filter/http_codec_filter.cc +++ b/src/filter/http_codec_filter.cc @@ -302,12 +302,15 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) { #ifndef NDEBUG std::cerr << "[DEBUG] HttpCodecFilter::onWrite client state=" - << HttpCodecStateMachine::getStateName(current_state) << std::endl; + << HttpCodecStateMachine::getStateName(current_state) + << std::endl; #endif // Check if we can send a request - // Client can send when idle or after receiving a complete response - if (current_state == HttpCodecState::Idle) { + // Client can send when idle or while waiting for response (HTTP pipelining) + // HTTP/1.1 allows multiple requests to be sent before receiving responses + if (current_state == HttpCodecState::Idle || + current_state == HttpCodecState::WaitingForResponse) { // Save the original request body (JSON-RPC) size_t body_length = data.length(); std::string body_data( @@ -334,10 +337,13 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) { std::cerr << "[DEBUG] HttpCodecFilter client sending HTTP request: " << request_str.substr(0, 200) << "..." << std::endl; - // Update state machine - the entire request (headers + body) is formatted - // in one call, so the request is complete after this - state_machine_->handleEvent(HttpCodecEvent::RequestBegin); - state_machine_->handleEvent(HttpCodecEvent::RequestComplete); + // Update state machine - only transition if we're in Idle state + // For pipelined requests (when already WaitingForResponse), just send + // without additional state transitions + if (current_state == HttpCodecState::Idle) { + state_machine_->handleEvent(HttpCodecEvent::RequestBegin); + state_machine_->handleEvent(HttpCodecEvent::RequestComplete); + } } } return network::FilterStatus::Continue; @@ -576,7 +582,8 @@ http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onBody( parent_.current_stream_->body.append(data, length); #ifndef NDEBUG std::cerr << "[DEBUG] ParserCallbacks::onBody - total body now " - << parent_.current_stream_->body.length() << " bytes" << std::endl; + << parent_.current_stream_->body.length() << " bytes" + << std::endl; #endif } // Trigger body data event based on mode @@ -592,8 +599,9 @@ http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onMessageComplete() { #ifndef NDEBUG std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - is_server=" - << parent_.is_server_ - << " body_len=" << (parent_.current_stream_ ? parent_.current_stream_->body.length() : 0) + << parent_.is_server_ << " body_len=" + << (parent_.current_stream_ ? parent_.current_stream_->body.length() + : 0) << std::endl; #endif // Trigger message complete event based on mode @@ -614,8 +622,9 @@ HttpCodecFilter::ParserCallbacks::onMessageComplete() { } } else { #ifndef NDEBUG - std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - NO BODY to forward!" - << std::endl; + std::cerr + << "[DEBUG] ParserCallbacks::onMessageComplete - NO BODY to forward!" + << std::endl; #endif } diff --git a/src/filter/http_routing_filter.cc b/src/filter/http_routing_filter.cc index a263c255..cf305eef 100644 --- a/src/filter/http_routing_filter.cc +++ b/src/filter/http_routing_filter.cc @@ -54,8 +54,9 @@ void HttpRoutingFilter::onHeaders( // In client mode, we're receiving responses, not requests - skip routing if (!is_server_) { - std::cerr << "[DEBUG] HttpRoutingFilter: client mode, passing through response" - << std::endl; + std::cerr + << "[DEBUG] HttpRoutingFilter: client mode, passing through response" + << std::endl; if (next_callbacks_) { next_callbacks_->onHeaders(headers, keep_alive); } diff --git a/src/filter/http_sse_filter_chain_factory.cc b/src/filter/http_sse_filter_chain_factory.cc index 5c71140a..33308c90 100644 --- a/src/filter/http_sse_filter_chain_factory.cc +++ b/src/filter/http_sse_filter_chain_factory.cc @@ -350,8 +350,9 @@ class HttpSseJsonRpcProtocolFilter } #ifndef NDEBUG - std::cerr << "[DEBUG] HttpSseJsonRpcProtocolFilter::onWrite - calling http_filter" - << std::endl; + std::cerr + << "[DEBUG] HttpSseJsonRpcProtocolFilter::onWrite - calling http_filter" + << std::endl; #endif // HTTP filter adds headers/framing for normal HTTP responses return http_filter_->onWrite(data, end_stream); @@ -636,7 +637,8 @@ class HttpSseJsonRpcProtocolFilter McpProtocolCallbacks& mcp_callbacks_; bool is_server_; bool is_sse_mode_{false}; - bool client_accepts_sse_{false}; // Track if client supports SSE (Accept header) + bool client_accepts_sse_{ + false}; // Track if client supports SSE (Accept header) bool sse_headers_written_{ false}; // Track if HTTP headers sent for SSE stream diff --git a/src/json/json_serialization.cc b/src/json/json_serialization.cc index 770a7fa6..6ae5dbc2 100644 --- a/src/json/json_serialization.cc +++ b/src/json/json_serialization.cc @@ -122,6 +122,10 @@ JsonValue serialize_ResponseResult(const jsonrpc::ResponseResult& result) { builder.add(to_json(resource)); } json_result = builder.build(); + }, + [&json_result](const ListResourcesResult& list_result) { + // ListResourcesResult is a full result object with resources array + json_result = to_json(list_result); }); return json_result;