From 1a84d8bf73384aab4cdf69c231ecee3a1043938c Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 22 Jul 2024 22:14:32 +0900 Subject: [PATCH 1/5] chore: add missing includes --- .../nebula_decoders_hesai/decoders/hesai_packet.hpp | 1 + nebula_decoders/src/nebula_decoders_hesai/hesai_driver.cpp | 2 ++ .../nebula_hw_interfaces_hesai/hesai_cmd_response.hpp | 1 + 3 files changed, 4 insertions(+) diff --git a/nebula_decoders/include/nebula_decoders/nebula_decoders_hesai/decoders/hesai_packet.hpp b/nebula_decoders/include/nebula_decoders/nebula_decoders_hesai/decoders/hesai_packet.hpp index 4b1ab3147..e2dafb30a 100644 --- a/nebula_decoders/include/nebula_decoders/nebula_decoders_hesai/decoders/hesai_packet.hpp +++ b/nebula_decoders/include/nebula_decoders/nebula_decoders_hesai/decoders/hesai_packet.hpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace nebula { diff --git a/nebula_decoders/src/nebula_decoders_hesai/hesai_driver.cpp b/nebula_decoders/src/nebula_decoders_hesai/hesai_driver.cpp index 22269bba7..6258c59ab 100644 --- a/nebula_decoders/src/nebula_decoders_hesai/hesai_driver.cpp +++ b/nebula_decoders/src/nebula_decoders_hesai/hesai_driver.cpp @@ -13,6 +13,8 @@ #include "nebula_decoders/nebula_decoders_hesai/decoders/pandar_xt32.hpp" #include "nebula_decoders/nebula_decoders_hesai/decoders/pandar_xt32m.hpp" +#include + // #define WITH_DEBUG_STD_COUT_HESAI_CLIENT // Use std::cout messages for debugging namespace nebula diff --git a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp index b51ce3b3b..54e757224 100644 --- a/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp +++ b/nebula_hw_interfaces/include/nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_cmd_response.hpp @@ -21,6 +21,7 @@ #include #include +#include #include #include From 3de7995214792c494f9d9c77ddf13c14dcabc090 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Mon, 22 Jul 2024 22:27:31 +0900 Subject: [PATCH 2/5] perf(hesai): move mt_queue from before decoder to after it, improving performance --- .../nebula_ros/hesai/decoder_wrapper.hpp | 35 +++++++-- .../nebula_ros/hesai/hesai_ros_wrapper.hpp | 7 -- nebula_ros/src/hesai/decoder_wrapper.cpp | 78 +++++++++++++------ nebula_ros/src/hesai/hesai_ros_wrapper.cpp | 20 ++--- 4 files changed, 85 insertions(+), 55 deletions(-) diff --git a/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp b/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp index 396f16d23..f74506710 100644 --- a/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/hesai/decoder_wrapper.hpp @@ -16,14 +16,18 @@ #include "nebula_decoders/nebula_decoders_hesai/hesai_driver.hpp" #include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp" +#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/common/watchdog_timer.hpp" #include #include +#include #include +#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp" #include "nebula_msgs/msg/nebula_packet.hpp" +#include "nebula_msgs/msg/nebula_packets.hpp" #include "pandar_msgs/msg/pandar_packet.hpp" #include "pandar_msgs/msg/pandar_scan.hpp" @@ -31,6 +35,9 @@ #include #include #include +#include +#include +#include #include namespace nebula @@ -63,6 +70,13 @@ class HesaiDecoderWrapper nebula::Status Status(); private: + struct PublishData + { + nebula_msgs::msg::NebulaPackets::UniquePtr packets; + drivers::NebulaPointCloudPtr cloud; + double cloud_timestamp_s; + }; + /// @brief Load calibration data from the best available source: /// 1. If sensor connected, download and save from sensor /// 2. If downloaded file available, load that file @@ -74,6 +88,8 @@ class HesaiDecoderWrapper get_calibration_result_t GetCalibrationData( const std::string & calibration_file_path, bool ignore_others = false); + void publish(PublishData && data); + void PublishCloud( std::unique_ptr pointcloud, const rclcpp::Publisher::SharedPtr & publisher); @@ -93,18 +109,21 @@ class HesaiDecoderWrapper const std::shared_ptr hw_interface_; std::shared_ptr sensor_cfg_; - std::string calibration_file_path_{}; - std::shared_ptr calibration_cfg_ptr_{}; + std::string calibration_file_path_; + std::shared_ptr calibration_cfg_ptr_; - std::shared_ptr driver_ptr_{}; + std::shared_ptr driver_ptr_; std::mutex mtx_driver_ptr_; - rclcpp::Publisher::SharedPtr packets_pub_{}; - pandar_msgs::msg::PandarScan::UniquePtr current_scan_msg_{}; + mt_queue publish_queue_; + std::thread pub_thread_; + + rclcpp::Publisher::SharedPtr packets_pub_; + nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_; - rclcpp::Publisher::SharedPtr nebula_points_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_ex_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_base_pub_{}; + rclcpp::Publisher::SharedPtr nebula_points_pub_; + rclcpp::Publisher::SharedPtr aw_points_ex_pub_; + rclcpp::Publisher::SharedPtr aw_points_base_pub_; std::shared_ptr cloud_watchdog_; }; diff --git a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp index 8a70aa0e3..1d1d95312 100644 --- a/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/hesai/hesai_ros_wrapper.hpp @@ -18,8 +18,6 @@ #include "nebula_common/hesai/hesai_common.hpp" #include "nebula_common/nebula_common.hpp" #include "nebula_common/nebula_status.hpp" -#include "nebula_hw_interfaces/nebula_hw_interfaces_hesai/hesai_hw_interface.hpp" -#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/hesai/decoder_wrapper.hpp" #include "nebula_ros/hesai/hw_interface_wrapper.hpp" @@ -83,11 +81,6 @@ class HesaiRosWrapper final : public rclcpp::Node std::shared_ptr sensor_cfg_ptr_{}; - /// @brief Stores received packets that have not been processed yet by the decoder thread - mt_queue> packet_queue_; - /// @brief Thread to isolate decoding from receiving - std::thread decoder_thread_; - rclcpp::Subscription::SharedPtr packets_sub_{}; bool launch_hw_; diff --git a/nebula_ros/src/hesai/decoder_wrapper.cpp b/nebula_ros/src/hesai/decoder_wrapper.cpp index b13d9d782..75137decc 100644 --- a/nebula_ros/src/hesai/decoder_wrapper.cpp +++ b/nebula_ros/src/hesai/decoder_wrapper.cpp @@ -2,6 +2,16 @@ #include "nebula_ros/hesai/decoder_wrapper.hpp" +#include + +#include +#include + +#include +#include +#include +#include + #pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" namespace nebula { @@ -17,7 +27,9 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( : status_(nebula::Status::NOT_INITIALIZED), logger_(parent_node->get_logger().get_child("HesaiDecoder")), hw_interface_(hw_interface), - sensor_cfg_(config) + sensor_cfg_(config), + publish_queue_(1), + current_scan_msg_(std::make_unique()) { if (!config) { throw std::runtime_error("HesaiDecoderWrapper cannot be instantiated without a valid config!"); @@ -54,14 +66,12 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( // Publish packets only if HW interface is connected if (hw_interface_) { - current_scan_msg_ = std::make_unique(); packets_pub_ = parent_node->create_publisher( "pandar_packets", rclcpp::SensorDataQoS()); } auto qos_profile = rmw_qos_profile_sensor_data; - auto pointcloud_qos = - rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile); + auto pointcloud_qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 1), qos_profile); nebula_points_pub_ = parent_node->create_publisher("pandar_points", pointcloud_qos); @@ -78,6 +88,13 @@ HesaiDecoderWrapper::HesaiDecoderWrapper( RCLCPP_WARN_THROTTLE( logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline"); }); + + pub_thread_ = std::thread([&]() { + while (true) { + auto publish_data = publish_queue_.pop(); + publish(std::move(publish_data)); + } + }); } void HesaiDecoderWrapper::OnConfigChange( @@ -221,26 +238,24 @@ HesaiDecoderWrapper::get_calibration_result_t HesaiDecoderWrapper::GetCalibratio void HesaiDecoderWrapper::ProcessCloudPacket( std::unique_ptr packet_msg) { + auto & packet = *packet_msg; + // Accumulate packets for recording only if someone is subscribed to the topic (for performance) - if ( - hw_interface_ && (packets_pub_->get_subscription_count() > 0 || - packets_pub_->get_intra_process_subscription_count() > 0)) { + if (hw_interface_) { if (current_scan_msg_->packets.size() == 0) { current_scan_msg_->header.stamp = packet_msg->stamp; + current_scan_msg_->header.frame_id = sensor_cfg_->frame_id; } - pandar_msgs::msg::PandarPacket pandar_packet_msg{}; - pandar_packet_msg.stamp = packet_msg->stamp; - pandar_packet_msg.size = packet_msg->data.size(); - std::copy(packet_msg->data.begin(), packet_msg->data.end(), pandar_packet_msg.data.begin()); - current_scan_msg_->packets.emplace_back(std::move(pandar_packet_msg)); + current_scan_msg_->packets.emplace_back(std::move(*packet_msg)); + packet = current_scan_msg_->packets.back(); } std::tuple pointcloud_ts{}; nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr; { std::lock_guard lock(mtx_driver_ptr_); - pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data); + pointcloud_ts = driver_ptr_->ParseCloudPacket(packet.data); pointcloud = std::get<0>(pointcloud_ts); } @@ -253,12 +268,28 @@ void HesaiDecoderWrapper::ProcessCloudPacket( return; } + publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)}); + current_scan_msg_ = std::make_unique(); +} + +void HesaiDecoderWrapper::publish(PublishData && data) +{ + auto pointcloud = data.cloud; + auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count()); + cloud_watchdog_->update(); - // Publish scan message only if it has been written to - if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { - packets_pub_->publish(std::move(current_scan_msg_)); - current_scan_msg_ = std::make_unique(); + if (!data.packets->packets.empty()) { + auto pandar_scan = std::make_unique(); + pandar_scan->header = data.packets->header; + + for (const auto & pkt : data.packets->packets) { + auto & pandar_pkt = pandar_scan->packets.emplace_back(); + pandar_pkt.stamp = pkt.stamp; + std::copy(pkt.data.begin(), pkt.data.end(), pandar_pkt.data.begin()); + } + + packets_pub_->publish(std::move(pandar_scan)); } if ( @@ -266,8 +297,7 @@ void HesaiDecoderWrapper::ProcessCloudPacket( nebula_points_pub_->get_intra_process_subscription_count() > 0) { auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_); } if ( @@ -277,19 +307,17 @@ void HesaiDecoderWrapper::ProcessCloudPacket( nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_); } if ( aw_points_ex_pub_->get_subscription_count() > 0 || aw_points_ex_pub_->get_intra_process_subscription_count() > 0) { - const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT( - pointcloud, std::get<1>(pointcloud_ts)); + const auto autoware_ex_cloud = + nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_); } } diff --git a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp index 132d0f4d2..ce6b6e544 100644 --- a/nebula_ros/src/hesai/hesai_ros_wrapper.cpp +++ b/nebula_ros/src/hesai/hesai_ros_wrapper.cpp @@ -2,6 +2,8 @@ #include "nebula_ros/hesai/hesai_ros_wrapper.hpp" +#include + #pragma clang diagnostic ignored "-Wbitwise-instead-of-logical" namespace nebula @@ -11,11 +13,7 @@ namespace ros HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options) : rclcpp::Node("hesai_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)), wrapper_status_(Status::NOT_INITIALIZED), - sensor_cfg_ptr_(nullptr), - packet_queue_(3000), - hw_interface_wrapper_(), - hw_monitor_wrapper_(), - decoder_wrapper_() + sensor_cfg_ptr_(nullptr) { setvbuf(stdout, NULL, _IONBF, BUFSIZ); @@ -40,12 +38,6 @@ HesaiRosWrapper::HesaiRosWrapper(const rclcpp::NodeOptions & options) RCLCPP_DEBUG(get_logger(), "Starting stream"); - decoder_thread_ = std::thread([this]() { - while (true) { - decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop()); - } - }); - if (launch_hw_) { hw_interface_wrapper_->HwInterface()->RegisterScanCallback( std::bind(&HesaiRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1)); @@ -212,7 +204,7 @@ void HesaiRosWrapper::ReceiveScanMessageCallback( nebula_pkt_ptr->stamp = pkt.stamp; std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data)); - packet_queue_.push(std::move(nebula_pkt_ptr)); + decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr)); } } @@ -304,9 +296,7 @@ void HesaiRosWrapper::ReceiveCloudPacketCallback(std::vector & packet) msg_ptr->stamp.nanosec = static_cast(timestamp_ns % 1'000'000'000); msg_ptr->data.swap(packet); - if (!packet_queue_.try_push(std::move(msg_ptr))) { - RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped"); - } + decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr)); } RCLCPP_COMPONENTS_REGISTER_NODE(HesaiRosWrapper) From f77ee3b0053c18cf45bb1819ffc2d3f663674483 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 23 Jul 2024 14:25:36 +0900 Subject: [PATCH 3/5] perf(velodyne): move mt_queue from before decoder to after it, improving performance --- .../nebula_ros/velodyne/decoder_wrapper.hpp | 30 +++++--- .../velodyne/velodyne_ros_wrapper.hpp | 6 -- nebula_ros/src/velodyne/decoder_wrapper.cpp | 69 +++++++++++++------ .../src/velodyne/velodyne_ros_wrapper.cpp | 18 +---- 4 files changed, 72 insertions(+), 51 deletions(-) diff --git a/nebula_ros/include/nebula_ros/velodyne/decoder_wrapper.hpp b/nebula_ros/include/nebula_ros/velodyne/decoder_wrapper.hpp index e78e51e0c..118c69c3c 100644 --- a/nebula_ros/include/nebula_ros/velodyne/decoder_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/velodyne/decoder_wrapper.hpp @@ -14,6 +14,7 @@ #pragma once +#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/common/watchdog_timer.hpp" @@ -25,6 +26,7 @@ #include #include +#include #include #include @@ -64,11 +66,20 @@ class VelodyneDecoderWrapper nebula::Status Status(); private: + struct PublishData + { + nebula_msgs::msg::NebulaPackets::UniquePtr packets; + drivers::NebulaPointCloudPtr cloud; + double cloud_timestamp_s; + }; + /// @brief Load calibration data from file /// @param calibration_file_path The file to read from /// @return The calibration data if successful, or an error code if not get_calibration_result_t GetCalibrationData(const std::string & calibration_file_path); + void publish(PublishData && data); + void PublishCloud( std::unique_ptr pointcloud, const rclcpp::Publisher::SharedPtr & publisher); @@ -88,18 +99,21 @@ class VelodyneDecoderWrapper const std::shared_ptr hw_interface_; std::shared_ptr sensor_cfg_; - std::string calibration_file_path_{}; - std::shared_ptr calibration_cfg_ptr_{}; + std::string calibration_file_path_; + std::shared_ptr calibration_cfg_ptr_; - std::shared_ptr driver_ptr_{}; + std::shared_ptr driver_ptr_; std::mutex mtx_driver_ptr_; - rclcpp::Publisher::SharedPtr packets_pub_{}; - velodyne_msgs::msg::VelodyneScan::UniquePtr current_scan_msg_{}; + mt_queue publish_queue_; + std::thread pub_thread_; + + rclcpp::Publisher::SharedPtr packets_pub_; + nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_; - rclcpp::Publisher::SharedPtr nebula_points_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_ex_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_base_pub_{}; + rclcpp::Publisher::SharedPtr nebula_points_pub_; + rclcpp::Publisher::SharedPtr aw_points_ex_pub_; + rclcpp::Publisher::SharedPtr aw_points_base_pub_; std::shared_ptr cloud_watchdog_; }; diff --git a/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp index 2ed7c4038..ad2a4f817 100644 --- a/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp @@ -14,7 +14,6 @@ #pragma once -#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/parameter_descriptors.hpp" #include "nebula_ros/velodyne/decoder_wrapper.hpp" #include "nebula_ros/velodyne/hw_interface_wrapper.hpp" @@ -84,11 +83,6 @@ class VelodyneRosWrapper final : public rclcpp::Node std::shared_ptr sensor_cfg_ptr_{}; - /// @brief Stores received packets that have not been processed yet by the decoder thread - mt_queue> packet_queue_; - /// @brief Thread to isolate decoding from receiving - std::thread decoder_thread_; - rclcpp::Subscription::SharedPtr packets_sub_{}; bool launch_hw_; diff --git a/nebula_ros/src/velodyne/decoder_wrapper.cpp b/nebula_ros/src/velodyne/decoder_wrapper.cpp index f55d7f28e..88eaf99e1 100644 --- a/nebula_ros/src/velodyne/decoder_wrapper.cpp +++ b/nebula_ros/src/velodyne/decoder_wrapper.cpp @@ -2,6 +2,11 @@ #include "nebula_ros/velodyne/decoder_wrapper.hpp" +#include "nebula_msgs/msg/detail/nebula_packets__struct.hpp" +#include + +#include + namespace nebula { namespace ros @@ -16,7 +21,9 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper( : status_(nebula::Status::NOT_INITIALIZED), logger_(parent_node->get_logger().get_child("VelodyneDecoder")), hw_interface_(hw_interface), - sensor_cfg_(config) + sensor_cfg_(config), + publish_queue_(1), + current_scan_msg_(std::make_unique()) { if (!config) { throw std::runtime_error( @@ -48,14 +55,12 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper( // Publish packets only if HW interface is connected if (hw_interface_) { - current_scan_msg_ = std::make_unique(); packets_pub_ = parent_node->create_publisher( "velodyne_packets", rclcpp::SensorDataQoS()); } auto qos_profile = rmw_qos_profile_sensor_data; - auto pointcloud_qos = - rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 10), qos_profile); + auto pointcloud_qos = rclcpp::QoS(rclcpp::QoSInitialization(qos_profile.history, 1), qos_profile); nebula_points_pub_ = parent_node->create_publisher("velodyne_points", pointcloud_qos); @@ -72,6 +77,13 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper( RCLCPP_WARN_THROTTLE( logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline"); }); + + pub_thread_ = std::thread([&]() { + while (true) { + auto publish_data = publish_queue_.pop(); + publish(std::move(publish_data)); + } + }); } void VelodyneDecoderWrapper::OnConfigChange( @@ -158,25 +170,24 @@ VelodyneDecoderWrapper::get_calibration_result_t VelodyneDecoderWrapper::GetCali void VelodyneDecoderWrapper::ProcessCloudPacket( std::unique_ptr packet_msg) { + auto & packet = *packet_msg; + // Accumulate packets for recording only if someone is subscribed to the topic (for performance) - if ( - hw_interface_ && (packets_pub_->get_subscription_count() > 0 || - packets_pub_->get_intra_process_subscription_count() > 0)) { + if (hw_interface_) { if (current_scan_msg_->packets.size() == 0) { current_scan_msg_->header.stamp = packet_msg->stamp; + current_scan_msg_->header.frame_id = sensor_cfg_->frame_id; } - velodyne_msgs::msg::VelodynePacket velodyne_packet_msg{}; - velodyne_packet_msg.stamp = packet_msg->stamp; - std::copy(packet_msg->data.begin(), packet_msg->data.end(), velodyne_packet_msg.data.begin()); - current_scan_msg_->packets.emplace_back(std::move(velodyne_packet_msg)); + current_scan_msg_->packets.emplace_back(std::move(*packet_msg)); + packet = current_scan_msg_->packets.back(); } std::tuple pointcloud_ts{}; nebula::drivers::NebulaPointCloudPtr pointcloud = nullptr; { std::lock_guard lock(mtx_driver_ptr_); - pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data, packet_msg->stamp.sec); + pointcloud_ts = driver_ptr_->ParseCloudPacket(packet.data, packet.stamp.sec); pointcloud = std::get<0>(pointcloud_ts); } @@ -184,12 +195,29 @@ void VelodyneDecoderWrapper::ProcessCloudPacket( return; } + publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)}); + current_scan_msg_ = std::make_unique(); +} + +void VelodyneDecoderWrapper::publish(PublishData && data) +{ + auto pointcloud = data.cloud; + auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count()); + cloud_watchdog_->update(); // Publish scan message only if it has been written to if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { - packets_pub_->publish(std::move(current_scan_msg_)); - current_scan_msg_ = std::make_unique(); + auto velodyne_scan = std::make_unique(); + velodyne_scan->header = data.packets->header; + + for (const auto & pkt : data.packets->packets) { + auto & velodyne_pkt = velodyne_scan->packets.emplace_back(); + velodyne_pkt.stamp = pkt.stamp; + std::copy(pkt.data.begin(), pkt.data.end(), velodyne_pkt.data.begin()); + } + + packets_pub_->publish(std::move(velodyne_scan)); } if ( @@ -197,8 +225,7 @@ void VelodyneDecoderWrapper::ProcessCloudPacket( nebula_points_pub_->get_intra_process_subscription_count() > 0) { auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_); } if ( @@ -208,19 +235,17 @@ void VelodyneDecoderWrapper::ProcessCloudPacket( nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_); } if ( aw_points_ex_pub_->get_subscription_count() > 0 || aw_points_ex_pub_->get_intra_process_subscription_count() > 0) { - const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT( - pointcloud, std::get<1>(pointcloud_ts)); + const auto autoware_ex_cloud = + nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_); } } diff --git a/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp b/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp index 808467e8c..51a1e8c86 100644 --- a/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp +++ b/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp @@ -11,11 +11,7 @@ namespace ros VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options) : rclcpp::Node("velodyne_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)), wrapper_status_(Status::NOT_INITIALIZED), - sensor_cfg_ptr_(nullptr), - packet_queue_(3000), - hw_interface_wrapper_(), - hw_monitor_wrapper_(), - decoder_wrapper_() + sensor_cfg_ptr_(nullptr) { setvbuf(stdout, NULL, _IONBF, BUFSIZ); @@ -40,12 +36,6 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options) RCLCPP_DEBUG(get_logger(), "Starting stream"); - decoder_thread_ = std::thread([this]() { - while (true) { - decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop()); - } - }); - if (launch_hw_) { hw_interface_wrapper_->HwInterface()->RegisterScanCallback( std::bind(&VelodyneRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1)); @@ -156,7 +146,7 @@ void VelodyneRosWrapper::ReceiveScanMessageCallback( nebula_pkt_ptr->stamp = pkt.stamp; std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data)); - packet_queue_.push(std::move(nebula_pkt_ptr)); + decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr)); } } @@ -247,9 +237,7 @@ void VelodyneRosWrapper::ReceiveCloudPacketCallback(std::vector & packe msg_ptr->stamp.nanosec = static_cast(timestamp_ns % 1'000'000'000); msg_ptr->data.swap(packet); - if (!packet_queue_.try_push(std::move(msg_ptr))) { - RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped"); - } + decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr)); } RCLCPP_COMPONENTS_REGISTER_NODE(VelodyneRosWrapper) From 3e2239f50b68f412ac605c17d02ce013cc767f88 Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 23 Jul 2024 14:59:16 +0900 Subject: [PATCH 4/5] perf(robosense): move mt_queue from before decoder to after it, improving performance --- .../nebula_ros/robosense/decoder_wrapper.hpp | 26 ++++-- .../robosense/robosense_ros_wrapper.hpp | 10 +- nebula_ros/src/robosense/decoder_wrapper.cpp | 92 +++++++++++-------- .../src/robosense/robosense_ros_wrapper.cpp | 18 +--- 4 files changed, 81 insertions(+), 65 deletions(-) diff --git a/nebula_ros/include/nebula_ros/robosense/decoder_wrapper.hpp b/nebula_ros/include/nebula_ros/robosense/decoder_wrapper.hpp index f0d858e59..c2d0c1c47 100644 --- a/nebula_ros/include/nebula_ros/robosense/decoder_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/robosense/decoder_wrapper.hpp @@ -14,6 +14,7 @@ #pragma once +#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/common/watchdog_timer.hpp" #include @@ -24,6 +25,7 @@ #include #include +#include #include #include @@ -51,6 +53,15 @@ class RobosenseDecoderWrapper nebula::Status Status(); private: + struct PublishData + { + nebula_msgs::msg::NebulaPackets::UniquePtr packets; + drivers::NebulaPointCloudPtr cloud; + double cloud_timestamp_s; + }; + + void publish(PublishData && data); + void PublishCloud( std::unique_ptr pointcloud, const rclcpp::Publisher::SharedPtr & publisher); @@ -69,17 +80,20 @@ class RobosenseDecoderWrapper std::shared_ptr hw_interface_; std::shared_ptr sensor_cfg_; - std::shared_ptr calibration_cfg_ptr_{}; + std::shared_ptr calibration_cfg_ptr_; std::shared_ptr driver_ptr_; std::mutex mtx_driver_ptr_; - rclcpp::Publisher::SharedPtr packets_pub_{}; - robosense_msgs::msg::RobosenseScan::UniquePtr current_scan_msg_{}; + rclcpp::Publisher::SharedPtr packets_pub_; + nebula_msgs::msg::NebulaPackets::UniquePtr current_scan_msg_; + + mt_queue publish_queue_; + std::thread pub_thread_; - rclcpp::Publisher::SharedPtr nebula_points_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_ex_pub_{}; - rclcpp::Publisher::SharedPtr aw_points_base_pub_{}; + rclcpp::Publisher::SharedPtr nebula_points_pub_; + rclcpp::Publisher::SharedPtr aw_points_ex_pub_; + rclcpp::Publisher::SharedPtr aw_points_base_pub_; std::shared_ptr cloud_watchdog_; }; diff --git a/nebula_ros/include/nebula_ros/robosense/robosense_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/robosense/robosense_ros_wrapper.hpp index 0c66e0f9d..7f7f6765a 100644 --- a/nebula_ros/include/nebula_ros/robosense/robosense_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/robosense/robosense_ros_wrapper.hpp @@ -14,7 +14,6 @@ #pragma once -#include "nebula_ros/common/mt_queue.hpp" #include "nebula_ros/robosense/decoder_wrapper.hpp" #include "nebula_ros/robosense/hw_interface_wrapper.hpp" #include "nebula_ros/robosense/hw_monitor_wrapper.hpp" @@ -83,14 +82,9 @@ class RobosenseRosWrapper final : public rclcpp::Node nebula::Status wrapper_status_; - std::shared_ptr sensor_cfg_ptr_{}; + std::shared_ptr sensor_cfg_ptr_; - /// @brief Stores received packets that have not been processed yet by the decoder thread - mt_queue> packet_queue_; - /// @brief Thread to isolate decoding from receiving - std::thread decoder_thread_; - - rclcpp::Subscription::SharedPtr packets_sub_{}; + rclcpp::Subscription::SharedPtr packets_sub_; bool launch_hw_; diff --git a/nebula_ros/src/robosense/decoder_wrapper.cpp b/nebula_ros/src/robosense/decoder_wrapper.cpp index c7cae7d3a..3650ec518 100644 --- a/nebula_ros/src/robosense/decoder_wrapper.cpp +++ b/nebula_ros/src/robosense/decoder_wrapper.cpp @@ -19,7 +19,9 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper( hw_interface_(hw_interface), sensor_cfg_(config), calibration_cfg_ptr_(calibration), - driver_ptr_(new drivers::RobosenseDriver(config, calibration)) + driver_ptr_(new drivers::RobosenseDriver(config, calibration)), + current_scan_msg_(std::make_unique()), + publish_queue_(1) { status_ = driver_ptr_->GetStatus(); @@ -30,7 +32,6 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper( // Publish packets only if HW interface is connected if (hw_interface_) { - current_scan_msg_ = std::make_unique(); packets_pub_ = parent_node->create_publisher( "robosense_packets", rclcpp::SensorDataQoS()); } @@ -55,24 +56,29 @@ RobosenseDecoderWrapper::RobosenseDecoderWrapper( logger_, *parent_node->get_clock(), 5000, "Missed pointcloud output deadline"); }); + pub_thread_ = std::thread([&]() { + while (true) { + auto publish_data = publish_queue_.pop(); + publish(std::move(publish_data)); + } + }); + RCLCPP_INFO(logger_, "Initialized decoder wrapper."); } void RobosenseDecoderWrapper::ProcessCloudPacket( std::unique_ptr packet_msg) { + auto & packet = *packet_msg; + // Accumulate packets for recording only if someone is subscribed to the topic (for performance) - if ( - hw_interface_ && (packets_pub_->get_subscription_count() > 0 || - packets_pub_->get_intra_process_subscription_count() > 0)) { + if (hw_interface_) { if (current_scan_msg_->packets.size() == 0) { current_scan_msg_->header.stamp = packet_msg->stamp; + current_scan_msg_->header.frame_id = sensor_cfg_->frame_id; } - robosense_msgs::msg::RobosensePacket robosense_packet_msg{}; - robosense_packet_msg.stamp = packet_msg->stamp; - std::copy(packet_msg->data.begin(), packet_msg->data.end(), robosense_packet_msg.data.begin()); - current_scan_msg_->packets.emplace_back(std::move(robosense_packet_msg)); + packet = current_scan_msg_->packets.emplace_back(std::move(*packet_msg)); } std::tuple pointcloud_ts{}; @@ -80,7 +86,7 @@ void RobosenseDecoderWrapper::ProcessCloudPacket( { std::lock_guard lock(mtx_driver_ptr_); - pointcloud_ts = driver_ptr_->ParseCloudPacket(packet_msg->data); + pointcloud_ts = driver_ptr_->ParseCloudPacket(packet.data); pointcloud = std::get<0>(pointcloud_ts); } @@ -88,12 +94,45 @@ void RobosenseDecoderWrapper::ProcessCloudPacket( return; } + publish_queue_.try_push({std::move(current_scan_msg_), pointcloud, std::get<1>(pointcloud_ts)}); + current_scan_msg_ = std::make_unique(); +} + +void RobosenseDecoderWrapper::OnConfigChange( + const std::shared_ptr & new_config) +{ + std::lock_guard lock(mtx_driver_ptr_); + auto new_driver = std::make_shared(new_config, calibration_cfg_ptr_); + driver_ptr_ = new_driver; + sensor_cfg_ = new_config; +} + +/// @brief Get current status of this driver +/// @return Current status +nebula::Status RobosenseDecoderWrapper::Status() +{ + return status_; +} + +void RobosenseDecoderWrapper::publish(PublishData && data) +{ + auto pointcloud = data.cloud; + auto header_stamp = rclcpp::Time(SecondsToChronoNanoSeconds(data.cloud_timestamp_s).count()); + cloud_watchdog_->update(); // Publish scan message only if it has been written to if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { - packets_pub_->publish(std::move(current_scan_msg_)); - current_scan_msg_ = std::make_unique(); + auto robosense_scan = std::make_unique(); + robosense_scan->header = data.packets->header; + + for (const auto & pkt : data.packets->packets) { + auto & robosense_pkt = robosense_scan->packets.emplace_back(); + robosense_pkt.stamp = pkt.stamp; + std::copy(pkt.data.begin(), pkt.data.end(), robosense_pkt.data.begin()); + } + + packets_pub_->publish(std::move(robosense_scan)); } if ( @@ -101,8 +140,7 @@ void RobosenseDecoderWrapper::ProcessCloudPacket( nebula_points_pub_->get_intra_process_subscription_count() > 0) { auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*pointcloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), nebula_points_pub_); } if ( @@ -112,39 +150,21 @@ void RobosenseDecoderWrapper::ProcessCloudPacket( nebula::drivers::convertPointXYZIRCAEDTToPointXYZIR(pointcloud); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_cloud_xyzi, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_base_pub_); } if ( aw_points_ex_pub_->get_subscription_count() > 0 || aw_points_ex_pub_->get_intra_process_subscription_count() > 0) { - const auto autoware_ex_cloud = nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT( - pointcloud, std::get<1>(pointcloud_ts)); + const auto autoware_ex_cloud = + nebula::drivers::convertPointXYZIRCAEDTToPointXYZIRADT(pointcloud, data.cloud_timestamp_s); auto ros_pc_msg_ptr = std::make_unique(); pcl::toROSMsg(*autoware_ex_cloud, *ros_pc_msg_ptr); - ros_pc_msg_ptr->header.stamp = - rclcpp::Time(SecondsToChronoNanoSeconds(std::get<1>(pointcloud_ts)).count()); + ros_pc_msg_ptr->header.stamp = header_stamp; PublishCloud(std::move(ros_pc_msg_ptr), aw_points_ex_pub_); } } -void RobosenseDecoderWrapper::OnConfigChange( - const std::shared_ptr & new_config) -{ - std::lock_guard lock(mtx_driver_ptr_); - auto new_driver = std::make_shared(new_config, calibration_cfg_ptr_); - driver_ptr_ = new_driver; - sensor_cfg_ = new_config; -} - -/// @brief Get current status of this driver -/// @return Current status -nebula::Status RobosenseDecoderWrapper::Status() -{ - return status_; -} - void RobosenseDecoderWrapper::PublishCloud( std::unique_ptr pointcloud, const rclcpp::Publisher::SharedPtr & publisher) diff --git a/nebula_ros/src/robosense/robosense_ros_wrapper.cpp b/nebula_ros/src/robosense/robosense_ros_wrapper.cpp index 65c5ec8a9..305dbdb41 100644 --- a/nebula_ros/src/robosense/robosense_ros_wrapper.cpp +++ b/nebula_ros/src/robosense/robosense_ros_wrapper.cpp @@ -13,11 +13,7 @@ namespace ros RobosenseRosWrapper::RobosenseRosWrapper(const rclcpp::NodeOptions & options) : rclcpp::Node("robosense_ros_wrapper", rclcpp::NodeOptions(options).use_intra_process_comms(true)), wrapper_status_(Status::NOT_INITIALIZED), - sensor_cfg_ptr_(nullptr), - packet_queue_(3000), - hw_interface_wrapper_(), - hw_monitor_wrapper_(), - decoder_wrapper_() + sensor_cfg_ptr_(nullptr) { setvbuf(stdout, NULL, _IONBF, BUFSIZ); @@ -40,12 +36,6 @@ RobosenseRosWrapper::RobosenseRosWrapper(const rclcpp::NodeOptions & options) RCLCPP_DEBUG(get_logger(), "Starting stream"); - decoder_thread_ = std::thread([this]() { - while (true) { - decoder_wrapper_->ProcessCloudPacket(packet_queue_.pop()); - } - }); - if (launch_hw_) { hw_interface_wrapper_->HwInterface()->RegisterScanCallback( std::bind(&RobosenseRosWrapper::ReceiveCloudPacketCallback, this, std::placeholders::_1)); @@ -144,7 +134,7 @@ void RobosenseRosWrapper::ReceiveScanMessageCallback( nebula_pkt_ptr->stamp = pkt.stamp; std::copy(pkt.data.begin(), pkt.data.end(), std::back_inserter(nebula_pkt_ptr->data)); - packet_queue_.push(std::move(nebula_pkt_ptr)); + decoder_wrapper_->ProcessCloudPacket(std::move(nebula_pkt_ptr)); } } @@ -281,9 +271,7 @@ void RobosenseRosWrapper::ReceiveCloudPacketCallback(std::vector & pack msg_ptr->stamp.nanosec = static_cast(timestamp_ns % 1'000'000'000); msg_ptr->data.swap(packet); - if (!packet_queue_.try_push(std::move(msg_ptr))) { - RCLCPP_ERROR_THROTTLE(get_logger(), *get_clock(), 500, "Packet(s) dropped"); - } + decoder_wrapper_->ProcessCloudPacket(std::move(msg_ptr)); } RCLCPP_COMPONENTS_REGISTER_NODE(RobosenseRosWrapper) From 989b01818d840fd0bafec2f9aef4551c73e8d25a Mon Sep 17 00:00:00 2001 From: Max SCHMELLER Date: Tue, 23 Jul 2024 16:29:58 +0900 Subject: [PATCH 5/5] fix: correct packet output condition for velodyne and robosense --- nebula_ros/src/robosense/decoder_wrapper.cpp | 2 +- nebula_ros/src/velodyne/decoder_wrapper.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nebula_ros/src/robosense/decoder_wrapper.cpp b/nebula_ros/src/robosense/decoder_wrapper.cpp index 3650ec518..987151223 100644 --- a/nebula_ros/src/robosense/decoder_wrapper.cpp +++ b/nebula_ros/src/robosense/decoder_wrapper.cpp @@ -122,7 +122,7 @@ void RobosenseDecoderWrapper::publish(PublishData && data) cloud_watchdog_->update(); // Publish scan message only if it has been written to - if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { + if (!data.packets->packets.empty()) { auto robosense_scan = std::make_unique(); robosense_scan->header = data.packets->header; diff --git a/nebula_ros/src/velodyne/decoder_wrapper.cpp b/nebula_ros/src/velodyne/decoder_wrapper.cpp index 88eaf99e1..05d69913b 100644 --- a/nebula_ros/src/velodyne/decoder_wrapper.cpp +++ b/nebula_ros/src/velodyne/decoder_wrapper.cpp @@ -207,7 +207,7 @@ void VelodyneDecoderWrapper::publish(PublishData && data) cloud_watchdog_->update(); // Publish scan message only if it has been written to - if (current_scan_msg_ && !current_scan_msg_->packets.empty()) { + if (data.packets->packets.empty()) { auto velodyne_scan = std::make_unique(); velodyne_scan->header = data.packets->header;