diff --git a/nebula_hw_interfaces/src/nebula_velodyne_hw_interfaces/velodyne_hw_interface.cpp b/nebula_hw_interfaces/src/nebula_velodyne_hw_interfaces/velodyne_hw_interface.cpp index 1f70ee674..7382b14a2 100644 --- a/nebula_hw_interfaces/src/nebula_velodyne_hw_interfaces/velodyne_hw_interface.cpp +++ b/nebula_hw_interfaces/src/nebula_velodyne_hw_interfaces/velodyne_hw_interface.cpp @@ -107,16 +107,20 @@ Status VelodyneHwInterface::get_sensor_configuration(SensorConfigurationBase & s VelodyneStatus VelodyneHwInterface::init_http_client() { - try { - http_client_driver_->init_client(sensor_configuration_->sensor_ip, 80); - if (!http_client_driver_->client()->isOpen()) { - http_client_driver_->client()->open(); + while (true) { + try { + http_client_driver_->init_client(sensor_configuration_->sensor_ip, 80); + if (!http_client_driver_->client()->isOpen()) { + http_client_driver_->client()->open(); + } + return Status::OK; + } catch (const std::exception & ex) { + RCLCPP_INFO_ONCE( + *parent_node_logger_, "Cannot connect to lidar because of: %s. Will keep trying...", + ex.what()); + std::this_thread::sleep_for(std::chrono::seconds(5)); } - } catch (const std::exception & ex) { - VelodyneStatus status = Status::HTTP_CONNECTION_ERROR; - return status; } - return Status::OK; } void VelodyneHwInterface::string_callback(const std::string & str) diff --git a/nebula_ros/CMakeLists.txt b/nebula_ros/CMakeLists.txt index aafb8d96a..8e52b11e4 100644 --- a/nebula_ros/CMakeLists.txt +++ b/nebula_ros/CMakeLists.txt @@ -3,7 +3,7 @@ project(nebula_ros) # Default to C++17 if (NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 17) + set(CMAKE_CXX_STANDARD 20) endif () if (CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") diff --git a/nebula_ros/include/nebula_ros/common/mt_queue.hpp b/nebula_ros/include/nebula_ros/common/mt_queue.hpp index 40f26253d..46125b518 100644 --- a/nebula_ros/include/nebula_ros/common/mt_queue.hpp +++ b/nebula_ros/include/nebula_ros/common/mt_queue.hpp @@ -46,7 +46,7 @@ class MtQueue void push(T && value) { std::unique_lock lock(this->mutex_); - this->cv_not_full_.wait(lock, [=] { return this->queue_.size() < this->capacity_; }); + this->cv_not_full_.wait(lock, [this] { return this->queue_.size() < this->capacity_; }); queue_.push_front(std::move(value)); this->cv_not_empty_.notify_all(); } @@ -54,11 +54,24 @@ class MtQueue T pop() { std::unique_lock lock(this->mutex_); - this->cv_not_empty_.wait(lock, [=] { return !this->queue_.empty(); }); + this->cv_not_empty_.wait(lock, [this] { return !this->queue_.empty(); }); T return_value(std::move(this->queue_.back())); this->queue_.pop_back(); this->cv_not_full_.notify_all(); return return_value; } + + std::pair pop(std::chrono::milliseconds timeout) + { + std::unique_lock lock(this->mutex_); + if (!this->cv_not_empty_.wait_for(lock, timeout, [this] { return !this->queue_.empty(); })) { + return std::make_pair(T(), false); + } + T return_value(std::move(this->queue_.back())); + this->queue_.pop_back(); + this->cv_not_full_.notify_all(); + + return std::make_pair(std::move(return_value), true); + } }; diff --git a/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp b/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp index 9f7fe8758..0943539ab 100644 --- a/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp +++ b/nebula_ros/include/nebula_ros/velodyne/velodyne_ros_wrapper.hpp @@ -69,6 +69,21 @@ class VelodyneRosWrapper final : public rclcpp::Node Status declare_and_get_sensor_config_params(); + void reconfigure_hw_interface(); + + void create_packet_subscriber(); + void reset_packet_subscriber(); + + // HW interface management + void bringup_hw(bool); + void cleanup_on_hw_reconfigure(); + void setup_decoder(); + + // Decoder thread management + void decoder_wrapper_thread(std::stop_token stoken); + void set_decoder_wrapper(); + void stop_decoder_thread() { decoder_thread_.request_stop(); } + /// @brief rclcpp parameter callback /// @param parameters Received parameters /// @return SetParametersResult @@ -85,15 +100,20 @@ class VelodyneRosWrapper final : public rclcpp::Node /// @brief Stores received packets that have not been processed yet by the decoder thread MtQueue> packet_queue_; /// @brief Thread to isolate decoding from receiving - std::thread decoder_thread_; + std::jthread decoder_thread_; rclcpp::Subscription::SharedPtr packets_sub_{}; bool launch_hw_; + bool use_udp_only_; + + bool restart_hw_ = false; + bool restart_packet_subscriber_ = false; std::optional hw_interface_wrapper_; std::optional hw_monitor_wrapper_; std::optional decoder_wrapper_; + rclcpp::TimerBase::SharedPtr hw_reconfigure_timer_; std::mutex mtx_config_; diff --git a/nebula_ros/src/velodyne/decoder_wrapper.cpp b/nebula_ros/src/velodyne/decoder_wrapper.cpp index 3a86fbe61..140f52dcc 100644 --- a/nebula_ros/src/velodyne/decoder_wrapper.cpp +++ b/nebula_ros/src/velodyne/decoder_wrapper.cpp @@ -31,8 +31,12 @@ VelodyneDecoderWrapper::VelodyneDecoderWrapper( "VelodyneDecoderWrapper cannot be instantiated without a valid config!"); } - calibration_file_path_ = - parent_node->declare_parameter("calibration_file", param_read_write()); + if (parent_node->has_parameter("calibration_file")) { + calibration_file_path_ = parent_node->get_parameter("calibration_file").as_string(); + } else { + calibration_file_path_ = + parent_node->declare_parameter("calibration_file", param_read_write()); + } auto calibration_result = get_calibration_data(calibration_file_path_); if (!calibration_result.has_value()) { diff --git a/nebula_ros/src/velodyne/hw_interface_wrapper.cpp b/nebula_ros/src/velodyne/hw_interface_wrapper.cpp index 4e2b7202d..a04d9eb73 100644 --- a/nebula_ros/src/velodyne/hw_interface_wrapper.cpp +++ b/nebula_ros/src/velodyne/hw_interface_wrapper.cpp @@ -17,8 +17,11 @@ VelodyneHwInterfaceWrapper::VelodyneHwInterfaceWrapper( status_(Status::NOT_INITIALIZED), use_udp_only_(use_udp_only) { - setup_sensor_ = parent_node->declare_parameter("setup_sensor", param_read_only()); - + if (parent_node->has_parameter("setup_sensor")) { + setup_sensor_ = parent_node->get_parameter("setup_sensor").as_bool(); + } else { + setup_sensor_ = parent_node->declare_parameter("setup_sensor", param_read_only()); + } hw_interface_->set_logger( std::make_shared(parent_node->get_logger().get_child("HwInterface"))); status_ = hw_interface_->initialize_sensor_configuration(config); diff --git a/nebula_ros/src/velodyne/hw_monitor_wrapper.cpp b/nebula_ros/src/velodyne/hw_monitor_wrapper.cpp index 3c4179802..c8c59a2ed 100644 --- a/nebula_ros/src/velodyne/hw_monitor_wrapper.cpp +++ b/nebula_ros/src/velodyne/hw_monitor_wrapper.cpp @@ -23,9 +23,17 @@ VelodyneHwMonitorWrapper::VelodyneHwMonitorWrapper( parent_node_(parent_node), sensor_configuration_(config) { - diag_span_ = parent_node->declare_parameter("diag_span", param_read_only()); - show_advanced_diagnostics_ = - parent_node->declare_parameter("advanced_diagnostics", param_read_only()); + if (parent_node->has_parameter("diag_span")) { + diag_span_ = parent_node->get_parameter("diag_span").as_int(); + } else { + diag_span_ = parent_node->declare_parameter("diag_span", param_read_only()); + } + if (parent_node->has_parameter("advanced_diagnostics")) { + show_advanced_diagnostics_ = parent_node->get_parameter("advanced_diagnostics").as_bool(); + } else { + show_advanced_diagnostics_ = + parent_node->declare_parameter("advanced_diagnostics", param_read_only()); + } std::cout << "Get model name and serial." << std::endl; auto str = hw_interface_->get_snapshot(); diff --git a/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp b/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp index dc415d219..bd0d08000 100644 --- a/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp +++ b/nebula_ros/src/velodyne/velodyne_ros_wrapper.cpp @@ -4,6 +4,8 @@ #include +#include // debug only + #include #include #include @@ -22,8 +24,11 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options) packet_queue_(3000), hw_interface_wrapper_(), hw_monitor_wrapper_(), - decoder_wrapper_() + decoder_wrapper_(), + hw_reconfigure_timer_(this->create_wall_timer( + std::chrono::seconds(1), std::bind(&VelodyneRosWrapper::reconfigure_hw_interface, this))) { + hw_reconfigure_timer_->cancel(); setvbuf(stdout, NULL, _IONBF, BUFSIZ); wrapper_status_ = declare_and_get_sensor_config_params(); @@ -34,10 +39,10 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options) RCLCPP_INFO_STREAM(get_logger(), "Sensor Configuration: " << *sensor_cfg_ptr_); - launch_hw_ = declare_parameter("launch_hw", param_read_only()); - bool use_udp_only = declare_parameter("udp_only", param_read_only()); + launch_hw_ = declare_parameter("launch_hw", param_read_write()); + use_udp_only_ = declare_parameter("udp_only", param_read_only()); - if (use_udp_only) { + if (use_udp_only_) { RCLCPP_INFO_STREAM( get_logger(), "UDP-only mode is enabled. Settings checks, synchronization, and diagnostics publishing are " @@ -45,40 +50,81 @@ VelodyneRosWrapper::VelodyneRosWrapper(const rclcpp::NodeOptions & options) } if (launch_hw_) { - hw_interface_wrapper_.emplace(this, sensor_cfg_ptr_, use_udp_only); - if (!use_udp_only) { // hardware monitor requires HTTP connection - hw_monitor_wrapper_.emplace(this, hw_interface_wrapper_->hw_interface(), sensor_cfg_ptr_); - } + bringup_hw(use_udp_only_); + } else { + create_packet_subscriber(); + } + + setup_decoder(); + + // Register parameter callback after all params have been declared. Otherwise it would be called + // once for each declaration + parameter_event_cb_ = add_on_set_parameters_callback( + std::bind(&VelodyneRosWrapper::on_parameter_change, this, std::placeholders::_1)); +} + +void VelodyneRosWrapper::reconfigure_hw_interface() +{ + if (restart_hw_) { + bringup_hw(use_udp_only_); + setup_decoder(); + restart_hw_ = false; + hw_reconfigure_timer_->cancel(); + } + + if (restart_packet_subscriber_) { + create_packet_subscriber(); + setup_decoder(); + restart_packet_subscriber_ = false; + hw_reconfigure_timer_->cancel(); } +} +void VelodyneRosWrapper::create_packet_subscriber() +{ + packets_sub_ = create_subscription( + "velodyne_packets", rclcpp::SensorDataQoS(), + std::bind(&VelodyneRosWrapper::receive_scan_message_callback, this, std::placeholders::_1)); + RCLCPP_INFO_STREAM( + get_logger(), + "Hardware connection disabled, listening for packets on " << packets_sub_->get_topic_name()); +} + +void VelodyneRosWrapper::set_decoder_wrapper() +{ + // If there's an existing decoder wrapper, reset it + if (decoder_wrapper_) { + decoder_wrapper_.reset(); + } decoder_wrapper_.emplace( this, hw_interface_wrapper_ ? hw_interface_wrapper_->hw_interface() : nullptr, sensor_cfg_ptr_); +} +void VelodyneRosWrapper::decoder_wrapper_thread(std::stop_token stoken) +{ RCLCPP_DEBUG(get_logger(), "Starting stream"); - decoder_thread_ = std::thread([this]() { - while (true) { - decoder_wrapper_->process_cloud_packet(packet_queue_.pop()); + while (!stoken.stop_requested()) { + auto [packet, valid] = packet_queue_.pop(std::chrono::milliseconds(100)); + if (valid) { + decoder_wrapper_->process_cloud_packet(std::move(packet)); + } else { + continue; } - }); - - if (launch_hw_) { - hw_interface_wrapper_->hw_interface()->register_scan_callback( - std::bind(&VelodyneRosWrapper::receive_cloud_packet_callback, this, std::placeholders::_1)); - stream_start(); - } else { - packets_sub_ = create_subscription( - "velodyne_packets", rclcpp::SensorDataQoS(), - std::bind(&VelodyneRosWrapper::receive_scan_message_callback, this, std::placeholders::_1)); - RCLCPP_INFO_STREAM( - get_logger(), - "Hardware connection disabled, listening for packets on " << packets_sub_->get_topic_name()); } + RCLCPP_INFO(get_logger(), "Gracefully stopped decoder thread"); +} - // Register parameter callback after all params have been declared. Otherwise it would be called - // once for each declaration - parameter_event_cb_ = add_on_set_parameters_callback( - std::bind(&VelodyneRosWrapper::on_parameter_change, this, std::placeholders::_1)); +void VelodyneRosWrapper::bringup_hw(bool use_udp_only) +{ + hw_interface_wrapper_.emplace(this, sensor_cfg_ptr_, use_udp_only); + + if (!use_udp_only) { // hardware monitor requires HTTP connection + hw_monitor_wrapper_.emplace(this, hw_interface_wrapper_->hw_interface(), sensor_cfg_ptr_); + } + hw_interface_wrapper_->hw_interface()->register_scan_callback( + std::bind(&VelodyneRosWrapper::receive_cloud_packet_callback, this, std::placeholders::_1)); + stream_start(); } nebula::Status VelodyneRosWrapper::declare_and_get_sensor_config_params() @@ -194,6 +240,38 @@ Status VelodyneRosWrapper::stream_start() return hw_interface_wrapper_->hw_interface()->sensor_interface_start(); } +void VelodyneRosWrapper::cleanup_on_hw_reconfigure() +{ + while (true) { + if (decoder_thread_.joinable()) { + stop_decoder_thread(); + break; + } + } + decoder_thread_.join(); + if (hw_interface_wrapper_) { + hw_interface_wrapper_.reset(); + } + if (hw_monitor_wrapper_) { + hw_monitor_wrapper_.reset(); + } +} + +void VelodyneRosWrapper::setup_decoder() +{ + set_decoder_wrapper(); + decoder_thread_ = std::jthread(&VelodyneRosWrapper::decoder_wrapper_thread, this); + // Set the thread name + pthread_setname_np(decoder_thread_.native_handle(), "VelDecThread"); // debug only +} + +void VelodyneRosWrapper::reset_packet_subscriber() +{ + if (packets_sub_) { + packets_sub_.reset(); + } +} + rcl_interfaces::msg::SetParametersResult VelodyneRosWrapper::on_parameter_change( const std::vector & p) { @@ -216,7 +294,27 @@ rcl_interfaces::msg::SetParametersResult VelodyneRosWrapper::on_parameter_change get_param(p, "max_range", new_cfg.max_range) | get_param(p, "rotation_speed", new_cfg.rotation_speed) | get_param(p, "cloud_min_angle", new_cfg.cloud_min_angle) | - get_param(p, "cloud_max_angle", new_cfg.cloud_max_angle); + get_param(p, "cloud_max_angle", new_cfg.cloud_max_angle) | + get_param(p, "launch_hw", launch_hw_); + + if (got_any && launch_hw_ && !hw_interface_wrapper_) { + RCLCPP_INFO(get_logger(), "Cancelling packet subscription..."); + reset_packet_subscriber(); + RCLCPP_INFO(get_logger(), "Resetting HW interface and HW Monitor wrappers..."); + cleanup_on_hw_reconfigure(); + RCLCPP_INFO(get_logger(), "(re)Configuring HW interface"); + restart_hw_ = true; + hw_reconfigure_timer_->reset(); + } + + if (got_any && !launch_hw_ && hw_interface_wrapper_) { + RCLCPP_INFO(get_logger(), "Resetting HW interface and HW Monitor wrappers..."); + cleanup_on_hw_reconfigure(); + reset_packet_subscriber(); + RCLCPP_INFO(get_logger(), "(re) Configuring packet subscriber"); + restart_packet_subscriber_ = true; + hw_reconfigure_timer_->reset(); + } // Currently, HW interface and monitor wrappers have only read-only parameters, so their update // logic is not implemented