From a4be35f4ee05f3e295c2bc8b4981f7900334474a Mon Sep 17 00:00:00 2001 From: Noel Jimenez Date: Thu, 5 Mar 2026 14:12:38 +0100 Subject: [PATCH] Implement onParametersSet for handling only analyzers node parameters The previous implementation is using a callback that is invoked for all the parameter events of all ROS 2 nodes and it filters by node name. It could lead to race conditions when setting parameters depending on the depth size of the callback. With this new approach, the callback is only triggered when setting parameters for the analyzers node, avoiding possible race conditions when setting the parameters. --- .../diagnostic_aggregator/aggregator.hpp | 15 ++++++---- diagnostic_aggregator/src/aggregator.cpp | 28 ++++++++++++++----- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp index 1d2c7e638..785c34118 100644 --- a/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp +++ b/diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp @@ -39,6 +39,7 @@ #ifndef DIAGNOSTIC_AGGREGATOR__AGGREGATOR_HPP_ #define DIAGNOSTIC_AGGREGATOR__AGGREGATOR_HPP_ +#include #include #include #include @@ -140,8 +141,10 @@ class Aggregator rclcpp::Service::SharedPtr add_srv_; /// DiagnosticArray, /diagnostics rclcpp::Subscription::SharedPtr diag_sub_; - /// ParameterEvent, /parameter_events - rclcpp::Subscription::SharedPtr param_sub_; + /// On-set parameters callback (node-scoped, no filtering needed) + rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr param_cb_handle_; + /// Flag set by onParametersSet, consumed by publishData on the next cycle + std::atomic reinit_needed_{false}; /// DiagnosticArray, /diagnostics_agg rclcpp::Publisher::SharedPtr agg_pub_; /// DiagnosticStatus, /diagnostics_toplevel_state @@ -174,10 +177,12 @@ class Aggregator /// Records all ROS warnings. No warnings are repeated. std::set ros_warnings_; - /* - *!\brief Checks for new parameters to trigger reinitialization of the AnalyzerGroup and OtherAnalyzer + /*! + *\brief Validates incoming parameter changes and schedules reinitialization + * when new parameters are detected. Only fires for this node's own parameters. */ - void parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr param_msg); + rcl_interfaces::msg::SetParametersResult onParametersSet( + const std::vector & parameters); /* *!\brief (re)initializes the AnalyzerGroup and OtherAnalyzer diff --git a/diagnostic_aggregator/src/aggregator.cpp b/diagnostic_aggregator/src/aggregator.cpp index 6b60eee32..5ae840035 100644 --- a/diagnostic_aggregator/src/aggregator.cpp +++ b/diagnostic_aggregator/src/aggregator.cpp @@ -87,18 +87,25 @@ Aggregator::Aggregator(rclcpp::NodeOptions options) std::chrono::milliseconds(publish_rate_ms), std::bind(&Aggregator::publishData, this)); - param_sub_ = n_->create_subscription( - "/parameter_events", 1, std::bind(&Aggregator::parameterCallback, this, _1)); + param_cb_handle_ = n_->add_on_set_parameters_callback( + std::bind(&Aggregator::onParametersSet, this, _1)); } -void Aggregator::parameterCallback(const rcl_interfaces::msg::ParameterEvent::SharedPtr msg) +rcl_interfaces::msg::SetParametersResult Aggregator::onParametersSet( + const std::vector & parameters) { - if (msg->node == "/" + std::string(n_->get_name())) { - if (msg->new_parameters.size() != 0) { - base_path_ = ""; - initAnalyzers(); + // Check if any of the incoming parameters are new. If so, flag for reinitialization. + // The method publishData() will pick it up on the next publish cycle and call initAnalyzers(). + for (const auto & p : parameters) { + if (!n_->has_parameter(p.get_name())) { + reinit_needed_.store(true); + break; } } + + rcl_interfaces::msg::SetParametersResult result; + result.successful = true; + return result; } void Aggregator::initAnalyzers() @@ -211,6 +218,13 @@ Aggregator::~Aggregator() void Aggregator::publishData() { RCLCPP_DEBUG(logger_, "publishData()"); + + // Check if reinitialization is needed because new parameters have been set. + if (reinit_needed_.exchange(false)) { + base_path_ = ""; + initAnalyzers(); + } + DiagnosticArray diag_array; DiagnosticStatus diag_toplevel_state; diag_toplevel_state.name = "toplevel_state";