From cb9e2e9cab4185529d2ad78ebc2b5db2d772e3b6 Mon Sep 17 00:00:00 2001 From: Ayrton2718 Date: Mon, 22 Sep 2025 19:17:45 +0900 Subject: [PATCH 1/2] feat(srr520): implement multi-threaded CAN and CAN FD packet processing --- .../continental_srr520_hw_interface.hpp | 18 ++- .../continental_srr520_hw_interface.cpp | 141 +++++++++++++++++- 2 files changed, 154 insertions(+), 5 deletions(-) diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp index d571953d9..739f1ac3e 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp @@ -25,8 +25,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -114,17 +116,31 @@ class ContinentalSRR520HwInterface /// @param debug Target string void print_debug(std::string debug); - /// @brief Main loop of the CAN receiver thread + /// @brief Main loop of the classic CAN receiver thread void receive_loop(); + /// @brief Main loop of the CAN FD receiver thread + void receive_fd_loop(); + + /// @brief Main loop of the packet callback processing thread + void callback_processing_loop(); + std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_receiver_ptr_; + std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_fd_receiver_ptr_; std::unique_ptr<::drivers::socketcan::SocketCanSender> can_sender_ptr_; std::unique_ptr receiver_thread_ptr_; + std::unique_ptr receiver_fd_thread_ptr_; + std::unique_ptr callback_thread_ptr_; std::shared_ptr config_ptr_; std::function buffer)> nebula_packet_callback_; + // Thread-safe packet queue + std::queue> packet_queue_; + std::mutex packet_queue_mutex_; + std::condition_variable packet_queue_cv_; + std::mutex receiver_mutex_; bool sensor_interface_active_{}; diff --git a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp index bd2d2eb5d..183a223c7 100644 --- a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp +++ b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -51,16 +52,34 @@ Status ContinentalSRR520HwInterface::sensor_interface_start() try { can_sender_ptr_ = std::make_unique<::drivers::socketcan::SocketCanSender>(config_ptr_->interface, true); + + // Create classic CAN receiver (enable_fd = false) can_receiver_ptr_ = + std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, false); + + // Create CAN FD receiver (enable_fd = true) + can_fd_receiver_ptr_ = std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, true); can_receiver_ptr_->SetCanFilters( ::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters)); + can_fd_receiver_ptr_->SetCanFilters( + ::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters)); logger_->info(std::string("applied filters: ") + config_ptr_->filters); sensor_interface_active_ = true; + + // Start classic CAN receiver thread receiver_thread_ptr_ = std::make_unique(&ContinentalSRR520HwInterface::receive_loop, this); + + // Start CAN FD receiver thread + receiver_fd_thread_ptr_ = + std::make_unique(&ContinentalSRR520HwInterface::receive_fd_loop, this); + + // Start callback processing thread + callback_thread_ptr_ = + std::make_unique(&ContinentalSRR520HwInterface::callback_processing_loop, this); } catch (const std::exception & ex) { Status status = Status::CAN_CONNECTION_ERROR; logger_->error(std::string("Error connecting to CAN interface: ") + ex.what()); @@ -108,8 +127,68 @@ void ContinentalSRR520HwInterface::receive_loop() } try { - packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID - receive_id = can_receiver_ptr_->receive_fd( + packet_msg_ptr->data.resize(12); // 8 bytes of data + 4 bytes of ID for classic CAN + receive_id = can_receiver_ptr_->receive( + packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec); + } catch (const std::exception & ex) { + logger_->error(std::string("Error receiving classic CAN message: ") + ex.what()); + continue; + } + + packet_msg_ptr->data.resize(receive_id.length() + 4); + + uint32_t id = receive_id.identifier(); + packet_msg_ptr->data[0] = (id & 0xFF000000) >> 24; + packet_msg_ptr->data[1] = (id & 0x00FF0000) >> 16; + packet_msg_ptr->data[2] = (id & 0x0000FF00) >> 8; + packet_msg_ptr->data[3] = (id & 0x000000FF) >> 0; + + int64_t stamp = use_bus_time + ? static_cast(receive_id.get_bus_time() * 1000U) + : static_cast(std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()); + + packet_msg_ptr->stamp.sec = stamp / 1'000'000'000; + packet_msg_ptr->stamp.nanosec = stamp % 1'000'000'000; + + if (receive_id.frame_type() == ::drivers::socketcan::FrameType::ERROR) { + logger_->error("Classic CAN message is an error frame"); + continue; + } + + // Push packet to thread-safe queue instead of calling callback directly + { + std::lock_guard queue_lock(packet_queue_mutex_); + packet_queue_.push(std::move(packet_msg_ptr)); + } + packet_queue_cv_.notify_one(); + } +} + +void ContinentalSRR520HwInterface::receive_fd_loop() +{ + ::drivers::socketcan::CanId receive_id{}; + std::chrono::nanoseconds receiver_timeout_nsec; + bool use_bus_time = false; + + while (true) { + auto packet_msg_ptr = std::make_unique(); + + { + std::lock_guard lock(receiver_mutex_); + receiver_timeout_nsec = std::chrono::duration_cast( + std::chrono::duration(config_ptr_->receiver_timeout_sec)); + use_bus_time = config_ptr_->use_bus_time; + + if (!sensor_interface_active_) { + break; + } + } + + try { + packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID for CAN FD + receive_id = can_fd_receiver_ptr_->receive_fd( packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec); } catch (const std::exception & ex) { logger_->error(std::string("Error receiving CAN FD message: ") + ex.what()); @@ -138,7 +217,47 @@ void ContinentalSRR520HwInterface::receive_loop() continue; } - nebula_packet_callback_(std::move(packet_msg_ptr)); + // Push packet to thread-safe queue instead of calling callback directly + { + std::lock_guard queue_lock(packet_queue_mutex_); + packet_queue_.push(std::move(packet_msg_ptr)); + } + packet_queue_cv_.notify_one(); + } +} + +void ContinentalSRR520HwInterface::callback_processing_loop() +{ + while (true) { + std::unique_ptr packet_ptr; + + // Wait for packets in the queue or check if we should exit + { + std::unique_lock queue_lock(packet_queue_mutex_); + packet_queue_cv_.wait(queue_lock, [this]() { + std::lock_guard receiver_lock(receiver_mutex_); + return !packet_queue_.empty() || !sensor_interface_active_; + }); + + // Check if we should exit + { + std::lock_guard receiver_lock(receiver_mutex_); + if (!sensor_interface_active_ && packet_queue_.empty()) { + break; + } + } + + // Get packet from queue if available + if (!packet_queue_.empty()) { + packet_ptr = std::move(packet_queue_.front()); + packet_queue_.pop(); + } + } + + // Process the packet outside the lock + if (packet_ptr && nebula_packet_callback_) { + nebula_packet_callback_(std::move(packet_ptr)); + } } } @@ -256,8 +375,22 @@ Status ContinentalSRR520HwInterface::sensor_interface_stop() std::lock_guard l(receiver_mutex_); sensor_interface_active_ = false; } + + // Notify the callback processing thread to wake up and check the stop condition + packet_queue_cv_.notify_all(); - receiver_thread_ptr_->join(); + if (receiver_thread_ptr_ && receiver_thread_ptr_->joinable()) { + receiver_thread_ptr_->join(); + } + + if (receiver_fd_thread_ptr_ && receiver_fd_thread_ptr_->joinable()) { + receiver_fd_thread_ptr_->join(); + } + + if (callback_thread_ptr_ && callback_thread_ptr_->joinable()) { + callback_thread_ptr_->join(); + } + return Status::ERROR_1; } From cfac0680f39295ae93ba1c4894048f24ab50fe80 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:39:20 +0000 Subject: [PATCH 2/2] ci(pre-commit): autofix --- .../continental_srr520_hw_interface.cpp | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp index 183a223c7..0b5767987 100644 --- a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp +++ b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp @@ -52,12 +52,12 @@ Status ContinentalSRR520HwInterface::sensor_interface_start() try { can_sender_ptr_ = std::make_unique<::drivers::socketcan::SocketCanSender>(config_ptr_->interface, true); - + // Create classic CAN receiver (enable_fd = false) can_receiver_ptr_ = std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, false); - - // Create CAN FD receiver (enable_fd = true) + + // Create CAN FD receiver (enable_fd = true) can_fd_receiver_ptr_ = std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, true); @@ -68,15 +68,15 @@ Status ContinentalSRR520HwInterface::sensor_interface_start() logger_->info(std::string("applied filters: ") + config_ptr_->filters); sensor_interface_active_ = true; - + // Start classic CAN receiver thread receiver_thread_ptr_ = std::make_unique(&ContinentalSRR520HwInterface::receive_loop, this); - - // Start CAN FD receiver thread + + // Start CAN FD receiver thread receiver_fd_thread_ptr_ = std::make_unique(&ContinentalSRR520HwInterface::receive_fd_loop, this); - + // Start callback processing thread callback_thread_ptr_ = std::make_unique(&ContinentalSRR520HwInterface::callback_processing_loop, this); @@ -230,7 +230,7 @@ void ContinentalSRR520HwInterface::callback_processing_loop() { while (true) { std::unique_ptr packet_ptr; - + // Wait for packets in the queue or check if we should exit { std::unique_lock queue_lock(packet_queue_mutex_); @@ -238,7 +238,7 @@ void ContinentalSRR520HwInterface::callback_processing_loop() std::lock_guard receiver_lock(receiver_mutex_); return !packet_queue_.empty() || !sensor_interface_active_; }); - + // Check if we should exit { std::lock_guard receiver_lock(receiver_mutex_); @@ -246,14 +246,14 @@ void ContinentalSRR520HwInterface::callback_processing_loop() break; } } - + // Get packet from queue if available if (!packet_queue_.empty()) { packet_ptr = std::move(packet_queue_.front()); packet_queue_.pop(); } } - + // Process the packet outside the lock if (packet_ptr && nebula_packet_callback_) { nebula_packet_callback_(std::move(packet_ptr)); @@ -375,22 +375,22 @@ Status ContinentalSRR520HwInterface::sensor_interface_stop() std::lock_guard l(receiver_mutex_); sensor_interface_active_ = false; } - + // Notify the callback processing thread to wake up and check the stop condition packet_queue_cv_.notify_all(); if (receiver_thread_ptr_ && receiver_thread_ptr_->joinable()) { receiver_thread_ptr_->join(); } - + if (receiver_fd_thread_ptr_ && receiver_fd_thread_ptr_->joinable()) { receiver_fd_thread_ptr_->join(); } - + if (callback_thread_ptr_ && callback_thread_ptr_->joinable()) { callback_thread_ptr_->join(); } - + return Status::ERROR_1; }