diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp index d571953d9..739f1ac3e 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_continental/continental_srr520_hw_interface.hpp @@ -25,8 +25,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -114,17 +116,31 @@ class ContinentalSRR520HwInterface /// @param debug Target string void print_debug(std::string debug); - /// @brief Main loop of the CAN receiver thread + /// @brief Main loop of the classic CAN receiver thread void receive_loop(); + /// @brief Main loop of the CAN FD receiver thread + void receive_fd_loop(); + + /// @brief Main loop of the packet callback processing thread + void callback_processing_loop(); + std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_receiver_ptr_; + std::unique_ptr<::drivers::socketcan::SocketCanReceiver> can_fd_receiver_ptr_; std::unique_ptr<::drivers::socketcan::SocketCanSender> can_sender_ptr_; std::unique_ptr receiver_thread_ptr_; + std::unique_ptr receiver_fd_thread_ptr_; + std::unique_ptr callback_thread_ptr_; std::shared_ptr config_ptr_; std::function buffer)> nebula_packet_callback_; + // Thread-safe packet queue + std::queue> packet_queue_; + std::mutex packet_queue_mutex_; + std::condition_variable packet_queue_cv_; + std::mutex receiver_mutex_; bool sensor_interface_active_{}; diff --git a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp index bd2d2eb5d..0b5767987 100644 --- a/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp +++ b/nebula_hw_interfaces/src/nebula_continental_hw_interfaces/continental_srr520_hw_interface.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -51,16 +52,34 @@ Status ContinentalSRR520HwInterface::sensor_interface_start() try { can_sender_ptr_ = std::make_unique<::drivers::socketcan::SocketCanSender>(config_ptr_->interface, true); + + // Create classic CAN receiver (enable_fd = false) can_receiver_ptr_ = + std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, false); + + // Create CAN FD receiver (enable_fd = true) + can_fd_receiver_ptr_ = std::make_unique<::drivers::socketcan::SocketCanReceiver>(config_ptr_->interface, true); can_receiver_ptr_->SetCanFilters( ::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters)); + can_fd_receiver_ptr_->SetCanFilters( + ::drivers::socketcan::SocketCanReceiver::CanFilterList(config_ptr_->filters)); logger_->info(std::string("applied filters: ") + config_ptr_->filters); sensor_interface_active_ = true; + + // Start classic CAN receiver thread receiver_thread_ptr_ = std::make_unique(&ContinentalSRR520HwInterface::receive_loop, this); + + // Start CAN FD receiver thread + receiver_fd_thread_ptr_ = + std::make_unique(&ContinentalSRR520HwInterface::receive_fd_loop, this); + + // Start callback processing thread + callback_thread_ptr_ = + std::make_unique(&ContinentalSRR520HwInterface::callback_processing_loop, this); } catch (const std::exception & ex) { Status status = Status::CAN_CONNECTION_ERROR; logger_->error(std::string("Error connecting to CAN interface: ") + ex.what()); @@ -108,8 +127,68 @@ void ContinentalSRR520HwInterface::receive_loop() } try { - packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID - receive_id = can_receiver_ptr_->receive_fd( + packet_msg_ptr->data.resize(12); // 8 bytes of data + 4 bytes of ID for classic CAN + receive_id = can_receiver_ptr_->receive( + packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec); + } catch (const std::exception & ex) { + logger_->error(std::string("Error receiving classic CAN message: ") + ex.what()); + continue; + } + + packet_msg_ptr->data.resize(receive_id.length() + 4); + + uint32_t id = receive_id.identifier(); + packet_msg_ptr->data[0] = (id & 0xFF000000) >> 24; + packet_msg_ptr->data[1] = (id & 0x00FF0000) >> 16; + packet_msg_ptr->data[2] = (id & 0x0000FF00) >> 8; + packet_msg_ptr->data[3] = (id & 0x000000FF) >> 0; + + int64_t stamp = use_bus_time + ? static_cast(receive_id.get_bus_time() * 1000U) + : static_cast(std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()); + + packet_msg_ptr->stamp.sec = stamp / 1'000'000'000; + packet_msg_ptr->stamp.nanosec = stamp % 1'000'000'000; + + if (receive_id.frame_type() == ::drivers::socketcan::FrameType::ERROR) { + logger_->error("Classic CAN message is an error frame"); + continue; + } + + // Push packet to thread-safe queue instead of calling callback directly + { + std::lock_guard queue_lock(packet_queue_mutex_); + packet_queue_.push(std::move(packet_msg_ptr)); + } + packet_queue_cv_.notify_one(); + } +} + +void ContinentalSRR520HwInterface::receive_fd_loop() +{ + ::drivers::socketcan::CanId receive_id{}; + std::chrono::nanoseconds receiver_timeout_nsec; + bool use_bus_time = false; + + while (true) { + auto packet_msg_ptr = std::make_unique(); + + { + std::lock_guard lock(receiver_mutex_); + receiver_timeout_nsec = std::chrono::duration_cast( + std::chrono::duration(config_ptr_->receiver_timeout_sec)); + use_bus_time = config_ptr_->use_bus_time; + + if (!sensor_interface_active_) { + break; + } + } + + try { + packet_msg_ptr->data.resize(68); // 64 bytes of data + 4 bytes of ID for CAN FD + receive_id = can_fd_receiver_ptr_->receive_fd( packet_msg_ptr->data.data() + 4 * sizeof(uint8_t), receiver_timeout_nsec); } catch (const std::exception & ex) { logger_->error(std::string("Error receiving CAN FD message: ") + ex.what()); @@ -138,7 +217,47 @@ void ContinentalSRR520HwInterface::receive_loop() continue; } - nebula_packet_callback_(std::move(packet_msg_ptr)); + // Push packet to thread-safe queue instead of calling callback directly + { + std::lock_guard queue_lock(packet_queue_mutex_); + packet_queue_.push(std::move(packet_msg_ptr)); + } + packet_queue_cv_.notify_one(); + } +} + +void ContinentalSRR520HwInterface::callback_processing_loop() +{ + while (true) { + std::unique_ptr packet_ptr; + + // Wait for packets in the queue or check if we should exit + { + std::unique_lock queue_lock(packet_queue_mutex_); + packet_queue_cv_.wait(queue_lock, [this]() { + std::lock_guard receiver_lock(receiver_mutex_); + return !packet_queue_.empty() || !sensor_interface_active_; + }); + + // Check if we should exit + { + std::lock_guard receiver_lock(receiver_mutex_); + if (!sensor_interface_active_ && packet_queue_.empty()) { + break; + } + } + + // Get packet from queue if available + if (!packet_queue_.empty()) { + packet_ptr = std::move(packet_queue_.front()); + packet_queue_.pop(); + } + } + + // Process the packet outside the lock + if (packet_ptr && nebula_packet_callback_) { + nebula_packet_callback_(std::move(packet_ptr)); + } } } @@ -257,7 +376,21 @@ Status ContinentalSRR520HwInterface::sensor_interface_stop() sensor_interface_active_ = false; } - receiver_thread_ptr_->join(); + // Notify the callback processing thread to wake up and check the stop condition + packet_queue_cv_.notify_all(); + + if (receiver_thread_ptr_ && receiver_thread_ptr_->joinable()) { + receiver_thread_ptr_->join(); + } + + if (receiver_fd_thread_ptr_ && receiver_fd_thread_ptr_->joinable()) { + receiver_fd_thread_ptr_->join(); + } + + if (callback_thread_ptr_ && callback_thread_ptr_->joinable()) { + callback_thread_ptr_->join(); + } + return Status::ERROR_1; }