Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 45 additions & 17 deletions src/client/mcp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,53 @@ std::future<InitializeResult> McpClient::initializeProtocol() {
}

// Defer all protocol operations to dispatcher thread
main_dispatcher_->post([this, result_promise]() {
// CRITICAL: We must NOT block on future.get() inside the dispatcher callback!
// That would deadlock because the dispatcher thread processes Read events.
// Instead, we send the request in the dispatcher, then wait on a worker thread.

auto request_future_ptr = std::make_shared<std::future<jsonrpc::Response>>();

// Step 1: Post to dispatcher to send the request (non-blocking)
main_dispatcher_->post([this, request_future_ptr]() {
// Notify protocol state machine that initialization is starting
if (protocol_state_machine_) {
protocol_state_machine_->handleEvent(
protocol::McpProtocolEvent::INITIALIZE_REQUESTED);
}

// Build initialize request with client capabilities
auto init_params = make_metadata();
init_params["protocolVersion"] = config_.protocol_version;
init_params["clientName"] = config_.client_name;
init_params["clientVersion"] = config_.client_version;

// Send request - do NOT block here!
*request_future_ptr = sendRequest("initialize", mcp::make_optional(init_params));
#ifndef NDEBUG
std::cerr << "[MCP-CLIENT] initializeProtocol: request sent, callback returning"
<< std::endl << std::flush;
#endif
// Callback returns immediately - response will be processed elsewhere
});

// Step 2: Use std::async to wait for response on a worker thread (not dispatcher!)
// Fire and forget - the async thread will set the promise when done
std::thread([this, result_promise, request_future_ptr]() {
try {
// Notify protocol state machine that initialization is starting
if (protocol_state_machine_) {
protocol_state_machine_->handleEvent(
protocol::McpProtocolEvent::INITIALIZE_REQUESTED);
// Wait for the request to be sent (the dispatcher callback to complete)
// Then wait for the response - this blocks a worker thread, NOT the dispatcher
while (!request_future_ptr->valid()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

// Build initialize request with client capabilities
// For now, use simple parameters - full serialization needs JSON
// conversion
auto init_params = make_metadata();
init_params["protocolVersion"] = config_.protocol_version;
init_params["clientName"] = config_.client_name;
init_params["clientVersion"] = config_.client_version;

// Send request and get response
auto future = sendRequest("initialize", mcp::make_optional(init_params));
auto response = future.get();
#ifndef NDEBUG
std::cerr << "[MCP-CLIENT] initializeProtocol: waiting for response on worker thread"
<< std::endl << std::flush;
#endif
auto response = request_future_ptr->get();
#ifndef NDEBUG
std::cerr << "[MCP-CLIENT] initializeProtocol: got response" << std::endl << std::flush;
#endif

if (response.error.has_value()) {
result_promise->set_exception(std::make_exception_ptr(
Expand Down Expand Up @@ -392,7 +420,7 @@ std::future<InitializeResult> McpClient::initializeProtocol() {
} catch (...) {
result_promise->set_exception(std::current_exception());
}
});
}).detach(); // Detach the thread - it will set the promise when done

return result_promise->get_future();
}
Expand Down
70 changes: 67 additions & 3 deletions src/event/libevent_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,22 @@ void LibeventDispatcher::post(PostCb callback) {
post_callbacks_.push(std::move(callback));
}

if (need_wakeup && !isThreadSafe()) {
// Wake up the event loop
#ifndef NDEBUG
bool is_thread_safe = isThreadSafe();
std::cerr << "[LIBEVENT] post(): need_wakeup=" << need_wakeup
<< " isThreadSafe=" << is_thread_safe
<< " thread=" << std::this_thread::get_id()
<< " dispatcher_thread=" << thread_id_ << std::endl;
#endif

if (need_wakeup) {
// Wake up the event loop - the queue was empty, so the event loop
// may be blocked waiting for events. We need to wake it up even if
// we're on the dispatcher thread, because event_base_loop() in Block
// mode won't return to check post_callbacks_ until an event fires.
#ifndef NDEBUG
std::cerr << "[LIBEVENT] post(): waking up event loop" << std::endl;
#endif
char byte = 1;
#ifdef _WIN32
int rc = send(wakeup_fd_[1], &byte, 1, 0);
Expand Down Expand Up @@ -331,6 +345,14 @@ void LibeventDispatcher::run(RunType type) {
exit_requested_ = false;
thread_id_ = std::this_thread::get_id();

#ifndef NDEBUG
const char* type_str = (type == RunType::Block) ? "Block"
: (type == RunType::NonBlock) ? "NonBlock"
: (type == RunType::RunUntilExit) ? "RunUntilExit"
: "Unknown";
std::cerr << "[LIBEVENT] run(): starting, type=" << type_str << std::endl;
#endif

// Run any pending post callbacks before starting
runPostCallbacks();

Expand All @@ -348,13 +370,18 @@ void LibeventDispatcher::run(RunType type) {
event_base_loop(base_, EVLOOP_ONCE);
runPostCallbacks();
}
#ifndef NDEBUG
std::cerr << "[LIBEVENT] run(): RunUntilExit loop ended" << std::endl;
#endif
return;
}

updateApproximateMonotonicTime();
int result = event_base_loop(base_, flags);

#ifndef NDEBUG
std::cerr << "[LIBEVENT] run(): event_base_loop returned " << result
<< " (type=" << type_str << ")" << std::endl;
if (result == 1) {
std::cerr << "[LIBEVENT] Warning: event_base_loop returned 1 (no events)"
<< std::endl;
Expand Down Expand Up @@ -423,6 +450,10 @@ void LibeventDispatcher::postWakeupCallback(libevent_socket_t fd,
void* arg) {
auto* dispatcher = static_cast<LibeventDispatcher*>(arg);

#ifndef NDEBUG
std::cerr << "[LIBEVENT] postWakeupCallback: entering" << std::endl;
#endif

// Drain the pipe
char buffer[256];
#ifdef _WIN32
Expand All @@ -434,6 +465,14 @@ void LibeventDispatcher::postWakeupCallback(libevent_socket_t fd,
}

dispatcher->runPostCallbacks();

#ifndef NDEBUG
std::cerr << "[LIBEVENT] postWakeupCallback: returning, active events="
<< event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ACTIVE)
<< " added="
<< event_base_get_num_events(dispatcher->base_, EVENT_BASE_COUNT_ADDED)
<< std::endl << std::flush;
#endif
}

void LibeventDispatcher::runPostCallbacks() {
Expand All @@ -443,6 +482,13 @@ void LibeventDispatcher::runPostCallbacks() {
callbacks.swap(post_callbacks_);
}

#ifndef NDEBUG
if (!callbacks.empty()) {
std::cerr << "[LIBEVENT] runPostCallbacks(): running " << callbacks.size()
<< " callbacks" << std::endl;
}
#endif

while (!callbacks.empty()) {
callbacks.front()();
callbacks.pop();
Expand Down Expand Up @@ -545,9 +591,16 @@ void LibeventDispatcher::FileEventImpl::activate(uint32_t events) {
}

void LibeventDispatcher::FileEventImpl::setEnabled(uint32_t events) {
#ifndef NDEBUG
std::cerr << "[LIBEVENT] setEnabled: fd=" << fd_ << " events=" << events
<< " (prev=" << enabled_events_ << ")" << std::endl;
#endif
// For edge-triggered, always update even if mask unchanged
// This forces re-computation of readable/writable state
if (trigger_ != FileTriggerType::Edge && enabled_events_ == events) {
#ifndef NDEBUG
std::cerr << "[LIBEVENT] setEnabled: skipping (no change)" << std::endl;
#endif
return;
}
enabled_events_ = events;
Expand Down Expand Up @@ -623,7 +676,12 @@ void LibeventDispatcher::FileEventImpl::assignEvents(uint32_t events) {
#else
event_assign(event_, dispatcher_.base(), fd_, libevent_events,
&FileEventImpl::eventCallback, this);
event_add(event_, nullptr);
int add_result = event_add(event_, nullptr);
#ifndef NDEBUG
std::cerr << "[LIBEVENT] assignEvents: fd=" << fd_
<< " libevent_events=0x" << std::hex << libevent_events << std::dec
<< " event_add=" << add_result << std::endl;
#endif
#endif
}

Expand All @@ -632,6 +690,12 @@ void LibeventDispatcher::FileEventImpl::eventCallback(libevent_socket_t fd,
void* arg) {
auto* file_event = static_cast<FileEventImpl*>(arg);

#ifndef NDEBUG
std::cerr << "[LIBEVENT] eventCallback: fd=" << fd << " events=0x"
<< std::hex << events << std::dec
<< " enabled=" << file_event->enabled_events_ << std::endl;
#endif

// Update approximate time before callback
file_event->dispatcher_.updateApproximateMonotonicTime();

Expand Down
67 changes: 46 additions & 21 deletions src/filter/http_codec_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,17 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) {
}
response << version_str.str() << " 200 OK\r\n";

// Detect SSE either via Accept header or the formatted payload
// Detect SSE ONLY by the formatted payload, NOT by Accept header.
// The Accept header just indicates client SUPPORTS SSE, not that we
// should use it. For JSON-RPC request/response, we need proper HTTP
// with Content-Length. Only use SSE format if the payload is already
// SSE-formatted (contains event:/data: lines).
bool is_sse_response = false;
if (current_stream_) {
std::string accept_lower = current_stream_->accept_header;
std::transform(accept_lower.begin(), accept_lower.end(),
accept_lower.begin(), ::tolower);
if (accept_lower.find("text/event-stream") != std::string::npos) {
is_sse_response = true;
}
}

if (!is_sse_response) {
// Heuristic: SSE payloads contain event/data lines
std::string_view payload_view(body_data);
if (payload_view.find("event:") != std::string_view::npos &&
payload_view.find("data:") != std::string_view::npos) {
is_sse_response = true;
}
// Heuristic: SSE payloads contain event/data lines
std::string_view payload_view(body_data);
if (payload_view.find("event:") != std::string_view::npos &&
payload_view.find("data:") != std::string_view::npos) {
is_sse_response = true;
}

if (is_sse_response) {
Expand All @@ -276,6 +269,11 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) {
// Regular JSON response
response << "Content-Type: application/json\r\n";
response << "Content-Length: " << body_length << "\r\n";
#ifndef NDEBUG
std::cerr << "[HTTP-CODEC] onWrite: Content-Length=" << body_length
<< " body_preview=" << body_data.substr(0, 50) << "..."
<< std::endl;
#endif
response << "Cache-Control: no-cache\r\n";
if (current_stream_) {
response << "Connection: "
Expand All @@ -302,6 +300,11 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) {
// Client mode: format as HTTP POST request
auto current_state = state_machine_->currentState();

#ifndef NDEBUG
std::cerr << "[DEBUG] HttpCodecFilter::onWrite client state="
<< HttpCodecStateMachine::getStateName(current_state) << std::endl;
#endif

// Check if we can send a request
// Client can send when idle or after receiving a complete response
if (current_state == HttpCodecState::Idle) {
Expand Down Expand Up @@ -331,11 +334,10 @@ network::FilterStatus HttpCodecFilter::onWrite(Buffer& data, bool end_stream) {
std::cerr << "[DEBUG] HttpCodecFilter client sending HTTP request: "
<< request_str.substr(0, 200) << "..." << std::endl;

// Update state machine
// Update state machine - the entire request (headers + body) is formatted
// in one call, so the request is complete after this
state_machine_->handleEvent(HttpCodecEvent::RequestBegin);
if (end_stream) {
state_machine_->handleEvent(HttpCodecEvent::RequestComplete);
}
state_machine_->handleEvent(HttpCodecEvent::RequestComplete);
}
}
return network::FilterStatus::Continue;
Expand Down Expand Up @@ -566,8 +568,16 @@ HttpCodecFilter::ParserCallbacks::onHeadersComplete() {

http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onBody(
const char* data, size_t length) {
#ifndef NDEBUG
std::cerr << "[DEBUG] ParserCallbacks::onBody - received " << length
<< " bytes" << std::endl;
#endif
if (parent_.current_stream_) {
parent_.current_stream_->body.append(data, length);
#ifndef NDEBUG
std::cerr << "[DEBUG] ParserCallbacks::onBody - total body now "
<< parent_.current_stream_->body.length() << " bytes" << std::endl;
#endif
}
// Trigger body data event based on mode
if (parent_.is_server_) {
Expand All @@ -580,6 +590,12 @@ http::ParserCallbackResult HttpCodecFilter::ParserCallbacks::onBody(

http::ParserCallbackResult
HttpCodecFilter::ParserCallbacks::onMessageComplete() {
#ifndef NDEBUG
std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - is_server="
<< parent_.is_server_
<< " body_len=" << (parent_.current_stream_ ? parent_.current_stream_->body.length() : 0)
<< std::endl;
#endif
// Trigger message complete event based on mode
if (parent_.is_server_) {
parent_.state_machine_->handleEvent(HttpCodecEvent::RequestComplete);
Expand All @@ -589,9 +605,18 @@ HttpCodecFilter::ParserCallbacks::onMessageComplete() {

// Send body to callbacks
if (parent_.current_stream_ && !parent_.current_stream_->body.empty()) {
#ifndef NDEBUG
std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - forwarding body"
<< std::endl;
#endif
if (parent_.message_callbacks_) {
parent_.message_callbacks_->onBody(parent_.current_stream_->body, true);
}
} else {
#ifndef NDEBUG
std::cerr << "[DEBUG] ParserCallbacks::onMessageComplete - NO BODY to forward!"
<< std::endl;
#endif
}

if (parent_.message_callbacks_) {
Expand Down
15 changes: 13 additions & 2 deletions src/filter/http_routing_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,20 @@ void HttpRoutingFilter::registerDefaultHandler(HandlerFunc handler) {
void HttpRoutingFilter::onHeaders(
const std::map<std::string, std::string>& headers, bool keep_alive) {
std::cerr << "[DEBUG] HttpRoutingFilter::onHeaders called with "
<< headers.size() << " headers" << std::endl;
<< headers.size() << " headers, is_server=" << is_server_
<< std::endl;

// In client mode, we're receiving responses, not requests - skip routing
if (!is_server_) {
std::cerr << "[DEBUG] HttpRoutingFilter: client mode, passing through response"
<< std::endl;
if (next_callbacks_) {
next_callbacks_->onHeaders(headers, keep_alive);
}
return;
}

// Stateless processing - make routing decision immediately
// Server mode: route incoming requests
std::string method = extractMethod(headers);
std::string path = extractPath(headers);

Expand Down
Loading
Loading