From 56aa83da037bdc7abc141bb6e56c003e034136f1 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 25 Feb 2026 18:30:12 +0000 Subject: [PATCH 1/8] For MVEC, replace future queue with a single future that will timeout internally for each message response type. --- .../include/mvec_lib/mvec_relay_socketcan.hpp | 51 ++++--- mvec/mvec_lib/src/mvec_relay_socketcan.cpp | 133 ++++++++++-------- .../mvec_lib/test/mvec_socketcan_hardware.cpp | 22 ++- mvec/mvec_ros2/src/mvec_node.cpp | 69 +++++---- 4 files changed, 168 insertions(+), 107 deletions(-) diff --git a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp index 8a07229..5d35f73 100644 --- a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp +++ b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp @@ -15,10 +15,11 @@ #ifndef MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ #define MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ +#include #include #include #include -#include +#include #include "mvec_lib/mvec_relay.hpp" #include "socketcan_adapter/socketcan_adapter.hpp" @@ -31,10 +32,17 @@ namespace polymath::sygnal class MvecRelaySocketcan { public: - /// @brief Constructor + /// @brief Constructor with default 500ms response timeout /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication explicit MvecRelaySocketcan(std::shared_ptr socketcan_adapter); + /// @brief Constructor with custom response timeout + /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication + /// @param response_timeout Timeout for all MVEC response types + MvecRelaySocketcan( + std::shared_ptr socketcan_adapter, + std::chrono::milliseconds response_timeout); + /// @brief Parse incoming CAN frame and fulfill waiting promises /// @param frame CAN frame to parse /// @return Message type that was parsed @@ -49,16 +57,16 @@ class MvecRelaySocketcan void clear_relay(); /// @brief Query current relay states asynchronously - /// @return Future that will contain relay query reply - std::future get_relay_state(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> get_relay_state(); /// @brief Send relay command and wait for confirmation - /// @return Future that will contain command reply - std::future send_relay_command(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> send_relay_command(); /// @brief Query device population (which relays/fuses are installed) - /// @return Future that will contain population reply - std::future get_relay_population(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> get_relay_population(); /// @brief Get last received fuse status message /// @return Optional containing fuse status if valid data available @@ -72,21 +80,32 @@ class MvecRelaySocketcan /// @return Optional containing error status if valid data available const std::optional get_last_error_status(); + /// @brief Default timeout for MVEC responses (500ms) + static constexpr std::chrono::milliseconds MVEC_DEFAULT_RESPONSE_TIMEOUT{500}; + private: /// @brief SocketCAN adapter for CAN communication std::shared_ptr socketcan_adapter_; /// @brief Core MVEC relay implementation MvecRelay relay_impl_; - /// @brief Queue of promises waiting for relay query responses - std::queue> query_reply_promises_; - /// @brief Queue of promises waiting for relay command responses - std::queue> command_reply_promises_; - /// @brief Queue of promises waiting for population query responses - std::queue> population_reply_promises_; + /// @brief Timeout for all MVEC response types + std::chrono::milliseconds response_timeout_; + + /// @brief Single-slot promise for relay query response + std::optional>> query_reply_promise_; + std::chrono::steady_clock::time_point query_send_time_; + std::mutex query_mutex_; + + /// @brief Single-slot promise for relay command response + std::optional>> command_reply_promise_; + std::chrono::steady_clock::time_point command_send_time_; + std::mutex command_mutex_; - /// @brief Mutex protecting promise queues for thread safety - std::mutex promises_mutex_; + /// @brief Single-slot promise for population query response + std::optional>> population_reply_promise_; + std::chrono::steady_clock::time_point population_send_time_; + std::mutex population_mutex_; }; } // namespace polymath::sygnal diff --git a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp index 50bf22b..864f8bd 100644 --- a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp @@ -21,45 +21,46 @@ namespace polymath::sygnal { MvecRelaySocketcan::MvecRelaySocketcan(std::shared_ptr socketcan_adapter) +: MvecRelaySocketcan(socketcan_adapter, MVEC_DEFAULT_RESPONSE_TIMEOUT) +{} + +MvecRelaySocketcan::MvecRelaySocketcan( + std::shared_ptr socketcan_adapter, + std::chrono::milliseconds response_timeout) : socketcan_adapter_(socketcan_adapter) , relay_impl_() +, response_timeout_(response_timeout) {} MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) { MvecMessageType message_type = relay_impl_.parseMessage(frame); - // Check if we received expected response types and fulfill promises - std::lock_guard lock(promises_mutex_); - switch (message_type) { case MvecMessageType::RELAY_QUERY_RESPONSE: { const auto & reply = relay_impl_.get_last_relay_query_reply(); - if (reply.is_valid() && !query_reply_promises_.empty()) { - // Get the oldest waiting promise - auto promise = std::move(query_reply_promises_.front()); - query_reply_promises_.pop(); - - // Fulfill the promise - promise.set_value(reply); + std::lock_guard lock(query_mutex_); + if (reply.is_valid() && query_reply_promise_.has_value()) { + query_reply_promise_->set_value(std::make_optional(reply)); + query_reply_promise_.reset(); } break; } case MvecMessageType::RELAY_COMMAND_RESPONSE: { const auto & reply = relay_impl_.get_last_relay_command_reply(); - if (reply.is_valid() && !command_reply_promises_.empty()) { - auto promise = std::move(command_reply_promises_.front()); - command_reply_promises_.pop(); - promise.set_value(reply); + std::lock_guard lock(command_mutex_); + if (reply.is_valid() && command_reply_promise_.has_value()) { + command_reply_promise_->set_value(std::make_optional(reply)); + command_reply_promise_.reset(); } break; } case MvecMessageType::POPULATION_RESPONSE: { const auto & reply = relay_impl_.get_last_population_reply(); - if (reply.is_valid() && !population_reply_promises_.empty()) { - auto promise = std::move(population_reply_promises_.front()); - population_reply_promises_.pop(); - promise.set_value(reply); + std::lock_guard lock(population_mutex_); + if (reply.is_valid() && population_reply_promise_.has_value()) { + population_reply_promise_->set_value(std::make_optional(reply)); + population_reply_promise_.reset(); } break; } @@ -82,67 +83,85 @@ void MvecRelaySocketcan::clear_relay() relay_impl_.clearRelayCommands(); } -std::future MvecRelaySocketcan::get_relay_state() +std::future> MvecRelaySocketcan::get_relay_state() { - // Get the query message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto query_frame = relay_impl_.getRelayQueryMessage(); + std::lock_guard lock(query_mutex_); + + // If a request is already in-flight, check if it has timed out + if (query_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - query_send_time_; + if (elapsed < response_timeout_) { + // Still in-flight, reject this request + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + // Timed out — fulfill the old promise with nullopt so any waiter gets a clean result + query_reply_promise_->set_value(std::nullopt); + query_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + query_reply_promise_.emplace(std::move(promise)); + query_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - query_reply_promises_.push(std::move(promise)); - } - - // Transmit the query message via socketcan adapter + auto query_frame = relay_impl_.getRelayQueryMessage(); socketcan_adapter_->send(query_frame); return future; } -std::future MvecRelaySocketcan::send_relay_command() +std::future> MvecRelaySocketcan::send_relay_command() { - // Get the command message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto command_frame = relay_impl_.getRelayCommandMessage(); + std::lock_guard lock(command_mutex_); + + if (command_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - command_send_time_; + if (elapsed < response_timeout_) { + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + command_reply_promise_->set_value(std::nullopt); + command_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + command_reply_promise_.emplace(std::move(promise)); + command_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - command_reply_promises_.push(std::move(promise)); - } - - // Transmit the command message via socketcan adapter + auto command_frame = relay_impl_.getRelayCommandMessage(); socketcan_adapter_->send(command_frame); return future; } -std::future MvecRelaySocketcan::get_relay_population() +std::future> MvecRelaySocketcan::get_relay_population() { - // Get the population query message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto population_frame = relay_impl_.getPopulationQueryMessage(); + std::lock_guard lock(population_mutex_); + + if (population_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - population_send_time_; + if (elapsed < response_timeout_) { + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + population_reply_promise_->set_value(std::nullopt); + population_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + population_reply_promise_.emplace(std::move(promise)); + population_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - population_reply_promises_.push(std::move(promise)); - } - - // Transmit the population query message via socketcan adapter + auto population_frame = relay_impl_.getPopulationQueryMessage(); socketcan_adapter_->send(population_frame); return future; diff --git a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp index f63e208..1adccce 100644 --- a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp +++ b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp @@ -64,7 +64,10 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = population_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto population_reply = population_future.get(); + auto population_result = population_future.get(); + REQUIRE(population_result.has_value()); + + const auto & population_reply = population_result.value(); std::cout << "Population query successful! Valid: " << population_reply.is_valid() << std::endl; // Check that we got a valid response @@ -101,7 +104,10 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = relay_state_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto relay_query_reply = relay_state_future.get(); + auto relay_query_result = relay_state_future.get(); + REQUIRE(relay_query_result.has_value()); + + const auto & relay_query_reply = relay_query_result.value(); std::cout << "Relay state query successful! Valid: " << relay_query_reply.is_valid() << std::endl; // Check that we got a valid response @@ -219,18 +225,20 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") REQUIRE(status == std::future_status::ready); - auto response = relay_command_response_future.get(); + auto response_result = relay_command_response_future.get(); + REQUIRE(response_result.has_value()); // 1 is no error - REQUIRE(response.get_success() == 1); + REQUIRE(response_result->get_success() == 1); std::cout << "Relay response message confirms success" << std::endl; auto mvec_query_future = mvec_socketcan->get_relay_state(); status = mvec_query_future.wait_for(std::chrono::seconds(5)); - auto relay_state = mvec_query_future.get(); + auto relay_state_result = mvec_query_future.get(); + REQUIRE(relay_state_result.has_value()); - REQUIRE(relay_state.get_relay_state(8) == 1); - REQUIRE(relay_state.get_relay_state(9) == 1); + REQUIRE(relay_state_result->get_relay_state(8) == 1); + REQUIRE(relay_state_result->get_relay_state(9) == 1); std::cout << "Relay states queried and confirm command" << std::endl; } diff --git a/mvec/mvec_ros2/src/mvec_node.cpp b/mvec/mvec_ros2/src/mvec_node.cpp index 8637bb7..153ca4b 100644 --- a/mvec/mvec_ros2/src/mvec_node.cpp +++ b/mvec/mvec_ros2/src/mvec_node.cpp @@ -171,8 +171,13 @@ void MvecNode::timerCallback() auto status = relay_state_future.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { - auto relay_query_reply = relay_state_future.get(); - if (relay_query_reply.is_valid()) { + auto relay_query_result = relay_state_future.get(); + if (!relay_query_result.has_value()) { + RCLCPP_DEBUG(get_logger(), "Relay state query returned nullopt (timed out or rejected due to pending request)"); + } else if (!relay_query_result->is_valid()) { + RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); + } else { + const auto & relay_query_reply = relay_query_result.value(); // Store current relay states mvec_msgs::msg::MvecFeedback feedback_msg; feedback_msg.header.stamp = get_clock()->now(); @@ -193,11 +198,9 @@ void MvecNode::timerCallback() addDefaultPresetIfNotPresent(defaults); current_relay_states_ = feedback_msg; RCLCPP_DEBUG(get_logger(), "Updated relay states from hardware"); - } else { - RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); } } else { - RCLCPP_DEBUG(get_logger(), "Relay state query timed out"); + RCLCPP_DEBUG(get_logger(), "Relay state query timed out waiting for future"); } // Publish diagnostics array @@ -250,21 +253,27 @@ std::optional MvecNode::set_single_relay(mvec_msgs::msg::Relay rela auto future = mvec_socketcan_->send_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status == std::future_status::ready) { - auto command_reply = future.get(); - if (command_reply.get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); - return std::nullopt; // Success - } else { - std::string error_msg = "MVEC device rejected relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - } else { + if (status != std::future_status::ready) { std::string error_msg = "Timeout waiting for relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } + + auto command_result = future.get(); + if (!command_result.has_value()) { + std::string error_msg = "Relay command returned no response (timeout or rejected)"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + + if (command_result->get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); + return std::nullopt; // Success + } + + std::string error_msg = "MVEC device rejected relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; } std::optional MvecNode::set_multi_relay(const std::vector & relays) @@ -292,21 +301,27 @@ std::optional MvecNode::set_multi_relay(const std::vectorsend_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status == std::future_status::ready) { - auto command_reply = future.get(); - if (command_reply.get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); - return std::nullopt; // Success - } else { - std::string error_msg = "MVEC device rejected multi-relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - } else { + if (status != std::future_status::ready) { std::string error_msg = "Timeout waiting for multi-relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } + + auto command_result = future.get(); + if (!command_result.has_value()) { + std::string error_msg = "Multi-relay command returned no response (timeout or rejected)"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + + if (command_result->get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); + return std::nullopt; // Success + } + + std::string error_msg = "MVEC device rejected multi-relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; } catch (const std::exception & e) { std::string error_msg = "Exception during multi-relay command: " + std::string(e.what()); RCLCPP_ERROR(get_logger(), "%s", error_msg.c_str()); From 838657ada9cfa1ab83f809662e1fe15753c34b3e Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 25 Feb 2026 19:00:52 +0000 Subject: [PATCH 2/8] fix pre-commit errors --- mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp | 3 +-- mvec/mvec_lib/src/mvec_relay_socketcan.cpp | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp index 5d35f73..07914c9 100644 --- a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp +++ b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp @@ -40,8 +40,7 @@ class MvecRelaySocketcan /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication /// @param response_timeout Timeout for all MVEC response types MvecRelaySocketcan( - std::shared_ptr socketcan_adapter, - std::chrono::milliseconds response_timeout); + std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout); /// @brief Parse incoming CAN frame and fulfill waiting promises /// @param frame CAN frame to parse diff --git a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp index 864f8bd..cfb67e9 100644 --- a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp @@ -25,8 +25,7 @@ MvecRelaySocketcan::MvecRelaySocketcan(std::shared_ptr socketcan_adapter, - std::chrono::milliseconds response_timeout) + std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout) : socketcan_adapter_(socketcan_adapter) , relay_impl_() , response_timeout_(response_timeout) From 6ac8c32481ad96f73d9cb56b2d5b284596233647 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 25 Feb 2026 19:03:04 +0000 Subject: [PATCH 3/8] fix pre-commit error --- mvec/mvec_lib/src/mvec_relay_socketcan.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp index cfb67e9..f694033 100644 --- a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp @@ -14,6 +14,7 @@ #include "mvec_lib/mvec_relay_socketcan.hpp" +#include #include #include From 225560a6ea0089a47104472fbe36be7f9376ff37 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 18 Mar 2026 11:26:50 +0000 Subject: [PATCH 4/8] reset mvec_node --- mvec/mvec_ros2/src/mvec_node.cpp | 69 +++++++++++++------------------- 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/mvec/mvec_ros2/src/mvec_node.cpp b/mvec/mvec_ros2/src/mvec_node.cpp index 153ca4b..8637bb7 100644 --- a/mvec/mvec_ros2/src/mvec_node.cpp +++ b/mvec/mvec_ros2/src/mvec_node.cpp @@ -171,13 +171,8 @@ void MvecNode::timerCallback() auto status = relay_state_future.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { - auto relay_query_result = relay_state_future.get(); - if (!relay_query_result.has_value()) { - RCLCPP_DEBUG(get_logger(), "Relay state query returned nullopt (timed out or rejected due to pending request)"); - } else if (!relay_query_result->is_valid()) { - RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); - } else { - const auto & relay_query_reply = relay_query_result.value(); + auto relay_query_reply = relay_state_future.get(); + if (relay_query_reply.is_valid()) { // Store current relay states mvec_msgs::msg::MvecFeedback feedback_msg; feedback_msg.header.stamp = get_clock()->now(); @@ -198,9 +193,11 @@ void MvecNode::timerCallback() addDefaultPresetIfNotPresent(defaults); current_relay_states_ = feedback_msg; RCLCPP_DEBUG(get_logger(), "Updated relay states from hardware"); + } else { + RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); } } else { - RCLCPP_DEBUG(get_logger(), "Relay state query timed out waiting for future"); + RCLCPP_DEBUG(get_logger(), "Relay state query timed out"); } // Publish diagnostics array @@ -253,27 +250,21 @@ std::optional MvecNode::set_single_relay(mvec_msgs::msg::Relay rela auto future = mvec_socketcan_->send_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status != std::future_status::ready) { + if (status == std::future_status::ready) { + auto command_reply = future.get(); + if (command_reply.get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); + return std::nullopt; // Success + } else { + std::string error_msg = "MVEC device rejected relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + } else { std::string error_msg = "Timeout waiting for relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } - - auto command_result = future.get(); - if (!command_result.has_value()) { - std::string error_msg = "Relay command returned no response (timeout or rejected)"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - - if (command_result->get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); - return std::nullopt; // Success - } - - std::string error_msg = "MVEC device rejected relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; } std::optional MvecNode::set_multi_relay(const std::vector & relays) @@ -301,27 +292,21 @@ std::optional MvecNode::set_multi_relay(const std::vectorsend_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status != std::future_status::ready) { + if (status == std::future_status::ready) { + auto command_reply = future.get(); + if (command_reply.get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); + return std::nullopt; // Success + } else { + std::string error_msg = "MVEC device rejected multi-relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + } else { std::string error_msg = "Timeout waiting for multi-relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } - - auto command_result = future.get(); - if (!command_result.has_value()) { - std::string error_msg = "Multi-relay command returned no response (timeout or rejected)"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - - if (command_result->get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); - return std::nullopt; // Success - } - - std::string error_msg = "MVEC device rejected multi-relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; } catch (const std::exception & e) { std::string error_msg = "Exception during multi-relay command: " + std::string(e.what()); RCLCPP_ERROR(get_logger(), "%s", error_msg.c_str()); From 162f96d80d13bbd87a16209e6f7b25b72f561847 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 18 Mar 2026 11:30:34 +0000 Subject: [PATCH 5/8] wrap set_single_relay in a set_single_relay() to align with set_multi_relay() --- mvec/mvec_ros2/src/mvec_node.cpp | 52 ++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/mvec/mvec_ros2/src/mvec_node.cpp b/mvec/mvec_ros2/src/mvec_node.cpp index 8637bb7..d2d35ae 100644 --- a/mvec/mvec_ros2/src/mvec_node.cpp +++ b/mvec/mvec_ros2/src/mvec_node.cpp @@ -232,37 +232,43 @@ void MvecNode::addDefaultPresetIfNotPresent(const std::vector MvecNode::set_single_relay(mvec_msgs::msg::Relay relay) { - /// TODO:(zeerekahmad): Reject unpopulated relay? - // Validate relay ID - if (relay.relay_id >= polymath::sygnal::MvecHardware::MAX_NUMBER_RELAYS) { - std::string error_msg = "Invalid relay ID: " + std::to_string(relay.relay_id); - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } + try { + /// TODO:(zeerekahmad): Reject unpopulated relay? + // Validate relay ID + if (relay.relay_id >= polymath::sygnal::MvecHardware::MAX_NUMBER_RELAYS) { + std::string error_msg = "Invalid relay ID: " + std::to_string(relay.relay_id); + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } - // Clear relay command before setting new state - mvec_socketcan_->clear_relay(); + // Clear relay command before setting new state + mvec_socketcan_->clear_relay(); - // Set the specific relay state - mvec_socketcan_->set_relay_in_command(relay.relay_id, relay.state ? 1 : 0); + // Set the specific relay state + mvec_socketcan_->set_relay_in_command(relay.relay_id, relay.state ? 1 : 0); - // Send command and wait for response - auto future = mvec_socketcan_->send_relay_command(); - auto status = future.wait_for(timeout_ms_); + // Send command and wait for response + auto future = mvec_socketcan_->send_relay_command(); + auto status = future.wait_for(timeout_ms_); - if (status == std::future_status::ready) { - auto command_reply = future.get(); - if (command_reply.get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); - return std::nullopt; // Success + if (status == std::future_status::ready) { + auto command_reply = future.get(); + if (command_reply.get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); + return std::nullopt; // Success + } else { + std::string error_msg = "MVEC device rejected relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } } else { - std::string error_msg = "MVEC device rejected relay command"; + std::string error_msg = "Timeout waiting for relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } - } else { - std::string error_msg = "Timeout waiting for relay command response"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + } catch (const std::exception & e) { + std::string error_msg = "Exception during single-relay command: " + std::string(e.what()); + RCLCPP_ERROR(get_logger(), "%s", error_msg.c_str()); return error_msg; } } From 4b75b86def719cf4ecf8bb44692e456438e60637 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 18 Mar 2026 12:16:23 +0000 Subject: [PATCH 6/8] removed optionals and blocking for existing messages with timeouts. Updated to just breaking promises for in-flight messages if their responses are not received. --- mvec/mvec_lib/CMakeLists.txt | 16 + mvec/mvec_lib/README.md | 10 +- .../include/mvec_lib/mvec_relay_socketcan.hpp | 58 +-- mvec/mvec_lib/src/mvec_relay_socketcan.cpp | 124 ++---- mvec/mvec_lib/test/mvec_relay_socketcan.cpp | 379 ++++++++++++++++++ 5 files changed, 457 insertions(+), 130 deletions(-) create mode 100644 mvec/mvec_lib/test/mvec_relay_socketcan.cpp diff --git a/mvec/mvec_lib/CMakeLists.txt b/mvec/mvec_lib/CMakeLists.txt index 96fcdff..eaec943 100644 --- a/mvec/mvec_lib/CMakeLists.txt +++ b/mvec/mvec_lib/CMakeLists.txt @@ -127,6 +127,22 @@ if(BUILD_TESTING) WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}" ) + add_executable(relay_socketcan_tests + test/mvec_relay_socketcan.cpp + ) + target_link_libraries(relay_socketcan_tests + PRIVATE ${PROJECT_NAME} Catch2::Catch2WithMain + ) + ament_add_test( + relay_socketcan_tests + GENERATE_RESULT_FOR_RETURN_CODE_ZERO + COMMAND "$" + -r junit -s + -o test_results/${PROJECT_NAME}/relay_socketcan_tests_output.xml + ENV CATCH_CONFIG_CONSOLE_WIDTH=120 + WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}" + ) + # Only build hardware tests if CAN_AVAILABLE is set in the environment if(DEFINED ENV{CAN_AVAILABLE}) add_executable(socketcan_hardware_tests diff --git a/mvec/mvec_lib/README.md b/mvec/mvec_lib/README.md index 278feb6..571fa5f 100644 --- a/mvec/mvec_lib/README.md +++ b/mvec/mvec_lib/README.md @@ -8,7 +8,7 @@ A comprehensive C++ library for controlling and monitoring MVEC (multiplexed Veh The foundational class providing J1939 message parsing, relay command generation, and status message handling. Supports all MVEC protocol operations including relay control, population queries, and diagnostic monitoring. ### MvecRelaySocketcan -High-level asynchronous interface built on top of MvecRelay using SocketCAN for Linux CAN communication. Features thread-safe promise/future patterns for non-blocking operations and automatic response handling. +High-level asynchronous interface built on top of MvecRelay using SocketCAN. Each request method (query, command, population) returns a `std::future` for the response. If a new request of the same type is made while one is in-flight, the previous promise is abandoned and the caller's future throws `broken_promise`. Callers use `wait_for()` to handle response timeouts. Per-type mutexes ensure thread safety between `parse()` and request methods. ## Quick Start @@ -44,15 +44,15 @@ cd build/mvec_lib && ./mvec_socketcan_hardware.cpp #include "socketcan_adapter/socketcan_adapter.hpp" // Create SocketCAN adapter -auto adapter = std::make_shared("can0"); -adapter->open(); +auto adapter = std::make_shared("can0"); +adapter->openSocket(); // Create MVEC controller polymath::sygnal::MvecRelaySocketcan controller(adapter); // Set up callback for incoming messages -adapter->setReceptionCallback([&controller](const socketcan::CanFrame& frame) { - controller.parse(frame); +adapter->setOnReceiveCallback([&controller](std::unique_ptr frame) { + controller.parse(*frame); }); ``` diff --git a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp index 07914c9..c9fea13 100644 --- a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp +++ b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp @@ -15,7 +15,6 @@ #ifndef MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ #define MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ -#include #include #include #include @@ -27,21 +26,17 @@ namespace polymath::sygnal { -/// @brief MVEC relay controller with async SocketCAN communication -/// Provides high-level interface for MVEC relay control with thread-safe promise/future pattern +/// @brief MVEC relay controller with async SocketCAN communication. +/// Each request method (query/command/population) abandons any in-flight request of the same type, +/// sends a new CAN frame, and returns a future for the response. +/// If a previous request was still pending, its promise is destroyed and the caller's future +/// throws broken_promise on get(). Callers use wait_for() to handle response timeouts. class MvecRelaySocketcan { public: - /// @brief Constructor with default 500ms response timeout /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication explicit MvecRelaySocketcan(std::shared_ptr socketcan_adapter); - /// @brief Constructor with custom response timeout - /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication - /// @param response_timeout Timeout for all MVEC response types - MvecRelaySocketcan( - std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout); - /// @brief Parse incoming CAN frame and fulfill waiting promises /// @param frame CAN frame to parse /// @return Message type that was parsed @@ -56,54 +51,43 @@ class MvecRelaySocketcan void clear_relay(); /// @brief Query current relay states asynchronously - /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. - std::future> get_relay_state(); + /// Abandons any in-flight query (caller's future throws broken_promise). + /// @return Future containing relay query reply. Use wait_for() to handle timeouts. + std::future get_relay_state(); - /// @brief Send relay command and wait for confirmation - /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. - std::future> send_relay_command(); + /// @brief Send relay command and get confirmation asynchronously + /// Abandons any in-flight command (caller's future throws broken_promise). + /// @return Future containing command reply. Use wait_for() to handle timeouts. + std::future send_relay_command(); /// @brief Query device population (which relays/fuses are installed) - /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. - std::future> get_relay_population(); + /// Abandons any in-flight query (caller's future throws broken_promise). + /// @return Future containing population reply. Use wait_for() to handle timeouts. + std::future get_relay_population(); /// @brief Get last received fuse status message /// @return Optional containing fuse status if valid data available - const std::optional get_last_fuse_status(); + std::optional get_last_fuse_status() const; /// @brief Get last received relay status message /// @return Optional containing relay status if valid data available - const std::optional get_last_relay_status(); + std::optional get_last_relay_status() const; /// @brief Get last received error status message /// @return Optional containing error status if valid data available - const std::optional get_last_error_status(); - - /// @brief Default timeout for MVEC responses (500ms) - static constexpr std::chrono::milliseconds MVEC_DEFAULT_RESPONSE_TIMEOUT{500}; + std::optional get_last_error_status() const; private: - /// @brief SocketCAN adapter for CAN communication std::shared_ptr socketcan_adapter_; - /// @brief Core MVEC relay implementation MvecRelay relay_impl_; - /// @brief Timeout for all MVEC response types - std::chrono::milliseconds response_timeout_; - - /// @brief Single-slot promise for relay query response - std::optional>> query_reply_promise_; - std::chrono::steady_clock::time_point query_send_time_; + std::optional> query_reply_promise_; std::mutex query_mutex_; - /// @brief Single-slot promise for relay command response - std::optional>> command_reply_promise_; - std::chrono::steady_clock::time_point command_send_time_; + std::optional> command_reply_promise_; std::mutex command_mutex_; - /// @brief Single-slot promise for population query response - std::optional>> population_reply_promise_; - std::chrono::steady_clock::time_point population_send_time_; + std::optional> population_reply_promise_; std::mutex population_mutex_; }; diff --git a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp index f694033..54d18c3 100644 --- a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp @@ -22,26 +22,22 @@ namespace polymath::sygnal { MvecRelaySocketcan::MvecRelaySocketcan(std::shared_ptr socketcan_adapter) -: MvecRelaySocketcan(socketcan_adapter, MVEC_DEFAULT_RESPONSE_TIMEOUT) -{} - -MvecRelaySocketcan::MvecRelaySocketcan( - std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout) : socketcan_adapter_(socketcan_adapter) , relay_impl_() -, response_timeout_(response_timeout) {} MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) { - MvecMessageType message_type = relay_impl_.parseMessage(frame); + const MvecMessageType message_type = relay_impl_.parseMessage(frame); + // Check if we received an expected response type and fulfill the waiting promise switch (message_type) { case MvecMessageType::RELAY_QUERY_RESPONSE: { const auto & reply = relay_impl_.get_last_relay_query_reply(); std::lock_guard lock(query_mutex_); if (reply.is_valid() && query_reply_promise_.has_value()) { - query_reply_promise_->set_value(std::make_optional(reply)); + // Fulfill the promise and clear the slot + query_reply_promise_->set_value(reply); query_reply_promise_.reset(); } break; @@ -50,7 +46,7 @@ MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) const auto & reply = relay_impl_.get_last_relay_command_reply(); std::lock_guard lock(command_mutex_); if (reply.is_valid() && command_reply_promise_.has_value()) { - command_reply_promise_->set_value(std::make_optional(reply)); + command_reply_promise_->set_value(reply); command_reply_promise_.reset(); } break; @@ -59,7 +55,7 @@ MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) const auto & reply = relay_impl_.get_last_population_reply(); std::lock_guard lock(population_mutex_); if (reply.is_valid() && population_reply_promise_.has_value()) { - population_reply_promise_->set_value(std::make_optional(reply)); + population_reply_promise_->set_value(reply); population_reply_promise_.reset(); } break; @@ -83,130 +79,82 @@ void MvecRelaySocketcan::clear_relay() relay_impl_.clearRelayCommands(); } -std::future> MvecRelaySocketcan::get_relay_state() +std::future MvecRelaySocketcan::get_relay_state() { std::lock_guard lock(query_mutex_); - // If a request is already in-flight, check if it has timed out - if (query_reply_promise_.has_value()) { - auto elapsed = std::chrono::steady_clock::now() - query_send_time_; - if (elapsed < response_timeout_) { - // Still in-flight, reject this request - std::promise> rejected; - auto future = rejected.get_future(); - rejected.set_value(std::nullopt); - return future; - } - // Timed out — fulfill the old promise with nullopt so any waiter gets a clean result - query_reply_promise_->set_value(std::nullopt); - query_reply_promise_.reset(); - } + // Abandon any in-flight query, caller's future becomes broken_promise if still waiting + query_reply_promise_.reset(); - std::promise> promise; + // Create a new promise and get its future + std::promise promise; auto future = promise.get_future(); query_reply_promise_.emplace(std::move(promise)); - query_send_time_ = std::chrono::steady_clock::now(); - - auto query_frame = relay_impl_.getRelayQueryMessage(); - socketcan_adapter_->send(query_frame); + // Transmit the query message via socketcan adapter + socketcan_adapter_->send(relay_impl_.getRelayQueryMessage()); return future; } -std::future> MvecRelaySocketcan::send_relay_command() +std::future MvecRelaySocketcan::send_relay_command() { std::lock_guard lock(command_mutex_); - if (command_reply_promise_.has_value()) { - auto elapsed = std::chrono::steady_clock::now() - command_send_time_; - if (elapsed < response_timeout_) { - std::promise> rejected; - auto future = rejected.get_future(); - rejected.set_value(std::nullopt); - return future; - } - command_reply_promise_->set_value(std::nullopt); - command_reply_promise_.reset(); - } + // Abandon any in-flight command, caller's future becomes broken_promise if still waiting + command_reply_promise_.reset(); - std::promise> promise; + // Create a new promise and get its future + std::promise promise; auto future = promise.get_future(); command_reply_promise_.emplace(std::move(promise)); - command_send_time_ = std::chrono::steady_clock::now(); - - auto command_frame = relay_impl_.getRelayCommandMessage(); - socketcan_adapter_->send(command_frame); + // Transmit the command message via socketcan adapter + socketcan_adapter_->send(relay_impl_.getRelayCommandMessage()); return future; } -std::future> MvecRelaySocketcan::get_relay_population() +std::future MvecRelaySocketcan::get_relay_population() { std::lock_guard lock(population_mutex_); - if (population_reply_promise_.has_value()) { - auto elapsed = std::chrono::steady_clock::now() - population_send_time_; - if (elapsed < response_timeout_) { - std::promise> rejected; - auto future = rejected.get_future(); - rejected.set_value(std::nullopt); - return future; - } - population_reply_promise_->set_value(std::nullopt); - population_reply_promise_.reset(); - } + // Abandon any in-flight query, caller's future becomes broken_promise if still waiting + population_reply_promise_.reset(); - std::promise> promise; + // Create a new promise and get its future + std::promise promise; auto future = promise.get_future(); population_reply_promise_.emplace(std::move(promise)); - population_send_time_ = std::chrono::steady_clock::now(); - - auto population_frame = relay_impl_.getPopulationQueryMessage(); - socketcan_adapter_->send(population_frame); + // Transmit the population query message via socketcan adapter + socketcan_adapter_->send(relay_impl_.getPopulationQueryMessage()); return future; } -const std::optional MvecRelaySocketcan::get_last_fuse_status() +std::optional MvecRelaySocketcan::get_last_fuse_status() const { - static std::optional result; - const auto & fuse_status = relay_impl_.get_fuse_status_message(); if (fuse_status.is_valid()) { - result = fuse_status; - } else { - result = std::nullopt; + return fuse_status; } - - return result; + return std::nullopt; } -const std::optional MvecRelaySocketcan::get_last_relay_status() +std::optional MvecRelaySocketcan::get_last_relay_status() const { - static std::optional result; - const auto & relay_status = relay_impl_.get_relay_status_message(); if (relay_status.is_valid()) { - result = relay_status; - } else { - result = std::nullopt; + return relay_status; } - - return result; + return std::nullopt; } -const std::optional MvecRelaySocketcan::get_last_error_status() +std::optional MvecRelaySocketcan::get_last_error_status() const { - static std::optional result; - const auto & error_status = relay_impl_.get_error_status_message(); if (error_status.is_valid()) { - result = error_status; - } else { - result = std::nullopt; + return error_status; } - - return result; + return std::nullopt; } } // namespace polymath::sygnal diff --git a/mvec/mvec_lib/test/mvec_relay_socketcan.cpp b/mvec/mvec_lib/test/mvec_relay_socketcan.cpp new file mode 100644 index 0000000..cdb33f2 --- /dev/null +++ b/mvec/mvec_lib/test/mvec_relay_socketcan.cpp @@ -0,0 +1,379 @@ +// Copyright (c) 2025-present Polymath Robotics, Inc. All rights reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "mvec_lib/mvec_relay_socketcan.hpp" + +#include +#include +#include +#include +#include + +#if __has_include() + #include +#elif __has_include() + #include +#else + #error "Catch2 headers not found. Please install Catch2 (v2 or v3)." +#endif + +#include "mvec_lib/core/mvec_constants.hpp" +#include "socketcan_adapter/can_frame.hpp" +#include "socketcan_adapter/socketcan_adapter.hpp" + +using namespace std::chrono_literals; + +static polymath::socketcan::CanFrame createTestFrame(uint32_t can_id, const std::vector & data) +{ + polymath::socketcan::CanFrame frame; + frame.set_can_id(can_id); + frame.set_id_as_extended(); + std::array frame_data; + frame_data.fill(0); + for (size_t i = 0; i < data.size() && i < CAN_MAX_DLC; ++i) { + frame_data[i] = data[i]; + } + frame.set_data(frame_data); + return frame; +} + +// J1939 ID for specific (peer-to-peer) responses from MVEC device +static const J1939_ID SPECIFIC_RESPONSE_ID( + 6, 0, polymath::sygnal::MvecProtocol::QUERY_PDU, 0x00, polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); + +static polymath::socketcan::CanFrame makeQueryReplyFrame() +{ + return createTestFrame(SPECIFIC_RESPONSE_ID.get_can_id(), {0x96, 0x00, 0xFF, 0x0F, 0xAA, 0x55, 0x00, 0x00}); +} + +static polymath::socketcan::CanFrame makeCommandReplyFrame() +{ + return createTestFrame(SPECIFIC_RESPONSE_ID.get_can_id(), {0x01, 0x88, 0x01, 0x00, 0xFF, 0x0F, 0x00, 0x00}); +} + +static polymath::socketcan::CanFrame makePopulationReplyFrame() +{ + return createTestFrame(SPECIFIC_RESPONSE_ID.get_can_id(), {0x94, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x0F, 0x00}); +} + +static std::shared_ptr makeDummyAdapter() +{ + // Socket is never opened — send() fails silently, which is fine for testing promise logic + return std::make_shared("dummy_test_iface"); +} + +// ============================================================================ +// Query tests (get_relay_state) +// ============================================================================ + +TEST_CASE("Query returns future that is fulfilled by parse", "[mvec_relay_socketcan][query]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto future = mvec.get_relay_state(); + REQUIRE(future.wait_for(0ms) == std::future_status::timeout); + + auto msg_type = mvec.parse(makeQueryReplyFrame()); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::RELAY_QUERY_RESPONSE); + REQUIRE(future.wait_for(0ms) == std::future_status::ready); + + auto reply = future.get(); + REQUIRE(reply.is_valid()); +} + +TEST_CASE("Query abandon-and-resend: second call breaks first future", "[mvec_relay_socketcan][query]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto future1 = mvec.get_relay_state(); + auto future2 = mvec.get_relay_state(); + + // future1's promise was destroyed — get() throws broken_promise + REQUIRE_THROWS_AS(future1.get(), std::future_error); + + // future2 is still pending + REQUIRE(future2.wait_for(0ms) == std::future_status::timeout); + + // Response fulfills the second (current) promise + mvec.parse(makeQueryReplyFrame()); + REQUIRE(future2.wait_for(0ms) == std::future_status::ready); + + auto reply = future2.get(); + REQUIRE(reply.is_valid()); +} + +TEST_CASE("Query: mismatched response still valid (idempotent)", "[mvec_relay_socketcan][query]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + // Send two queries rapidly — second abandons first + auto future1 = mvec.get_relay_state(); + auto future2 = mvec.get_relay_state(); + + // First CAN response arrives (response to query 1, but fills promise 2) + mvec.parse(makeQueryReplyFrame()); + REQUIRE(future2.wait_for(0ms) == std::future_status::ready); + + auto reply = future2.get(); + REQUIRE(reply.is_valid()); + + // Second response arrives — no promise, silently dropped + auto msg_type = mvec.parse(makeQueryReplyFrame()); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::RELAY_QUERY_RESPONSE); +} + +TEST_CASE("Query: response with no pending promise is silently dropped", "[mvec_relay_socketcan][query]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto msg_type = mvec.parse(makeQueryReplyFrame()); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::RELAY_QUERY_RESPONSE); +} + +TEST_CASE("Query: wait_for timeout when response never arrives", "[mvec_relay_socketcan][query]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto future = mvec.get_relay_state(); + REQUIRE(future.wait_for(10ms) == std::future_status::timeout); +} + +// ============================================================================ +// Command tests (send_relay_command) +// ============================================================================ + +TEST_CASE("Command returns future that is fulfilled by parse", "[mvec_relay_socketcan][command]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + mvec.set_relay_in_command(0, 1); + + auto future = mvec.send_relay_command(); + REQUIRE(future.wait_for(0ms) == std::future_status::timeout); + + mvec.parse(makeCommandReplyFrame()); + REQUIRE(future.wait_for(0ms) == std::future_status::ready); + + auto reply = future.get(); + REQUIRE(reply.is_valid()); +} + +TEST_CASE("Command abandon-and-resend: second call breaks first future", "[mvec_relay_socketcan][command]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + mvec.set_relay_in_command(0, 1); + + auto future1 = mvec.send_relay_command(); + mvec.set_relay_in_command(1, 1); + auto future2 = mvec.send_relay_command(); + + // future1's promise was destroyed — get() throws broken_promise + REQUIRE_THROWS_AS(future1.get(), std::future_error); + + // future2 is still pending + REQUIRE(future2.wait_for(0ms) == std::future_status::timeout); + + // Response fulfills the second (current) promise + mvec.parse(makeCommandReplyFrame()); + REQUIRE(future2.wait_for(0ms) == std::future_status::ready); + + auto reply = future2.get(); + REQUIRE(reply.is_valid()); +} + +TEST_CASE("Command: response with no pending promise is silently dropped", "[mvec_relay_socketcan][command]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto msg_type = mvec.parse(makeCommandReplyFrame()); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::RELAY_COMMAND_RESPONSE); +} + +// ============================================================================ +// Population tests (get_relay_population) +// ============================================================================ + +TEST_CASE("Population returns future that is fulfilled by parse", "[mvec_relay_socketcan][population]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto future = mvec.get_relay_population(); + REQUIRE(future.wait_for(0ms) == std::future_status::timeout); + + mvec.parse(makePopulationReplyFrame()); + REQUIRE(future.wait_for(0ms) == std::future_status::ready); + + auto reply = future.get(); + REQUIRE(reply.is_valid()); +} + +TEST_CASE("Population abandon-and-resend: second call breaks first future", "[mvec_relay_socketcan][population]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + auto future1 = mvec.get_relay_population(); + auto future2 = mvec.get_relay_population(); + + REQUIRE_THROWS_AS(future1.get(), std::future_error); + + mvec.parse(makePopulationReplyFrame()); + REQUIRE(future2.wait_for(0ms) == std::future_status::ready); + + auto reply = future2.get(); + REQUIRE(reply.is_valid()); +} + +// ============================================================================ +// Status getter tests +// ============================================================================ + +TEST_CASE("Status getters return nullopt before any data", "[mvec_relay_socketcan][status]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + REQUIRE_FALSE(mvec.get_last_fuse_status().has_value()); + REQUIRE_FALSE(mvec.get_last_relay_status().has_value()); + REQUIRE_FALSE(mvec.get_last_error_status().has_value()); +} + +TEST_CASE("Fuse status getter returns value after parse", "[mvec_relay_socketcan][status]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + J1939_ID fuse_status_id( + 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 0x00 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, + polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); + auto frame = createTestFrame(fuse_status_id.get_can_id(), {0x00, 0x55, 0xAA, 0x33, 0xCC, 0x0F, 0xF0, 0x99}); + + auto msg_type = mvec.parse(frame); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::FUSE_STATUS); + + auto fuse_status = mvec.get_last_fuse_status(); + REQUIRE(fuse_status.has_value()); + REQUIRE(fuse_status->is_valid()); +} + +TEST_CASE("Relay status getter returns value after parse", "[mvec_relay_socketcan][status]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + J1939_ID relay_status_id( + 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 0x01 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, + polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); + auto frame = createTestFrame(relay_status_id.get_can_id(), {0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77}); + + auto msg_type = mvec.parse(frame); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::RELAY_STATUS); + + auto relay_status = mvec.get_last_relay_status(); + REQUIRE(relay_status.has_value()); + REQUIRE(relay_status->is_valid()); +} + +TEST_CASE("Error status getter returns value after parse", "[mvec_relay_socketcan][status]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + + J1939_ID error_status_id( + 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 0x02 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, + polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); + auto frame = createTestFrame( + error_status_id.get_can_id(), + {polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS, 0xFF, 0x1F, 0x00, 0x00, 0x00, 0x00, 0x00}); + + auto msg_type = mvec.parse(frame); + REQUIRE(msg_type == polymath::sygnal::MvecMessageType::ERROR_STATUS); + + auto error_status = mvec.get_last_error_status(); + REQUIRE(error_status.has_value()); + REQUIRE(error_status->is_valid()); +} + +// ============================================================================ +// Cross-type isolation tests +// ============================================================================ + +TEST_CASE("Query response does not fulfill command promise", "[mvec_relay_socketcan][isolation]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + mvec.set_relay_in_command(0, 1); + + auto query_future = mvec.get_relay_state(); + auto command_future = mvec.send_relay_command(); + + // Parse a query response — should only fulfill query, not command + mvec.parse(makeQueryReplyFrame()); + REQUIRE(query_future.wait_for(0ms) == std::future_status::ready); + REQUIRE(command_future.wait_for(0ms) == std::future_status::timeout); + + // Parse a command response — now command is fulfilled + mvec.parse(makeCommandReplyFrame()); + REQUIRE(command_future.wait_for(0ms) == std::future_status::ready); +} + +TEST_CASE("All three request types can be in-flight simultaneously", "[mvec_relay_socketcan][isolation]") +{ + auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); + mvec.set_relay_in_command(0, 1); + + auto query_future = mvec.get_relay_state(); + auto command_future = mvec.send_relay_command(); + auto population_future = mvec.get_relay_population(); + + // All pending + REQUIRE(query_future.wait_for(0ms) == std::future_status::timeout); + REQUIRE(command_future.wait_for(0ms) == std::future_status::timeout); + REQUIRE(population_future.wait_for(0ms) == std::future_status::timeout); + + // Fulfill in reverse order + mvec.parse(makePopulationReplyFrame()); + REQUIRE(population_future.wait_for(0ms) == std::future_status::ready); + REQUIRE(query_future.wait_for(0ms) == std::future_status::timeout); + REQUIRE(command_future.wait_for(0ms) == std::future_status::timeout); + + mvec.parse(makeCommandReplyFrame()); + REQUIRE(command_future.wait_for(0ms) == std::future_status::ready); + REQUIRE(query_future.wait_for(0ms) == std::future_status::timeout); + + mvec.parse(makeQueryReplyFrame()); + REQUIRE(query_future.wait_for(0ms) == std::future_status::ready); +} + +// ============================================================================ +// Thread safety test +// ============================================================================ + +TEST_CASE("Concurrent parse and query calls don't crash", "[mvec_relay_socketcan][threading]") +{ + auto mvec = std::make_shared(makeDummyAdapter()); + constexpr int ITERATIONS = 100; + + std::thread query_thread([&]() { + for (int i = 0; i < ITERATIONS; ++i) { + auto future = mvec->get_relay_state(); + future.wait_for(1ms); + } + }); + + std::thread parse_thread([&]() { + for (int i = 0; i < ITERATIONS; ++i) { + mvec->parse(makeQueryReplyFrame()); + } + }); + + query_thread.join(); + parse_thread.join(); +} From 7310ccbfb8f4ab2d41be6e960c0ed70da6955844 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 18 Mar 2026 12:16:42 +0000 Subject: [PATCH 7/8] update usage in hardware test --- .../mvec_lib/test/mvec_socketcan_hardware.cpp | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp index 1adccce..f9b79e5 100644 --- a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp +++ b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp @@ -64,10 +64,7 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = population_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto population_result = population_future.get(); - REQUIRE(population_result.has_value()); - - const auto & population_reply = population_result.value(); + auto population_reply = population_future.get(); std::cout << "Population query successful! Valid: " << population_reply.is_valid() << std::endl; // Check that we got a valid response @@ -104,10 +101,7 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = relay_state_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto relay_query_result = relay_state_future.get(); - REQUIRE(relay_query_result.has_value()); - - const auto & relay_query_reply = relay_query_result.value(); + auto relay_query_reply = relay_state_future.get(); std::cout << "Relay state query successful! Valid: " << relay_query_reply.is_valid() << std::endl; // Check that we got a valid response @@ -225,20 +219,18 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") REQUIRE(status == std::future_status::ready); - auto response_result = relay_command_response_future.get(); - REQUIRE(response_result.has_value()); + auto relay_command_reply = relay_command_response_future.get(); // 1 is no error - REQUIRE(response_result->get_success() == 1); + REQUIRE(relay_command_reply.get_success() == 1); std::cout << "Relay response message confirms success" << std::endl; auto mvec_query_future = mvec_socketcan->get_relay_state(); status = mvec_query_future.wait_for(std::chrono::seconds(5)); auto relay_state_result = mvec_query_future.get(); - REQUIRE(relay_state_result.has_value()); - REQUIRE(relay_state_result->get_relay_state(8) == 1); - REQUIRE(relay_state_result->get_relay_state(9) == 1); + REQUIRE(relay_state_result.get_relay_state(8) == 1); + REQUIRE(relay_state_result.get_relay_state(9) == 1); std::cout << "Relay states queried and confirm command" << std::endl; } From 33e5711640c8b0548409525a497b3361fabaadb1 Mon Sep 17 00:00:00 2001 From: Ryan Bahl Date: Wed, 18 Mar 2026 12:45:44 +0000 Subject: [PATCH 8/8] fix pre-commit errors --- mvec/mvec_lib/test/mvec_relay_socketcan.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mvec/mvec_lib/test/mvec_relay_socketcan.cpp b/mvec/mvec_lib/test/mvec_relay_socketcan.cpp index cdb33f2..26e2af7 100644 --- a/mvec/mvec_lib/test/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/test/mvec_relay_socketcan.cpp @@ -32,7 +32,7 @@ #include "socketcan_adapter/can_frame.hpp" #include "socketcan_adapter/socketcan_adapter.hpp" -using namespace std::chrono_literals; +using std::chrono_literals::operator""ms; static polymath::socketcan::CanFrame createTestFrame(uint32_t can_id, const std::vector & data) { @@ -251,7 +251,9 @@ TEST_CASE("Fuse status getter returns value after parse", "[mvec_relay_socketcan auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); J1939_ID fuse_status_id( - 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 6, + 0, + polymath::sygnal::MvecProtocol::STATUS_PDU, 0x00 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); auto frame = createTestFrame(fuse_status_id.get_can_id(), {0x00, 0x55, 0xAA, 0x33, 0xCC, 0x0F, 0xF0, 0x99}); @@ -269,7 +271,9 @@ TEST_CASE("Relay status getter returns value after parse", "[mvec_relay_socketca auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); J1939_ID relay_status_id( - 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 6, + 0, + polymath::sygnal::MvecProtocol::STATUS_PDU, 0x01 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); auto frame = createTestFrame(relay_status_id.get_can_id(), {0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77}); @@ -287,7 +291,9 @@ TEST_CASE("Error status getter returns value after parse", "[mvec_relay_socketca auto mvec = polymath::sygnal::MvecRelaySocketcan(makeDummyAdapter()); J1939_ID error_status_id( - 6, 0, polymath::sygnal::MvecProtocol::STATUS_PDU, + 6, + 0, + polymath::sygnal::MvecProtocol::STATUS_PDU, 0x02 + polymath::sygnal::MvecProtocol::DEFAULT_PGN_BASE_VALUE, polymath::sygnal::MvecProtocol::DEFAULT_SOURCE_ADDRESS); auto frame = createTestFrame(