Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ COPY src/ros2_medkit_plugins/ ${COLCON_WS}/src/ros2_medkit_plugins/
RUN bash -c "source /opt/ros/${ROS_DISTRO}/setup.bash && \
rosdep update && \
rosdep install --from-paths src --ignore-src -r -y \
--skip-keys='ament_cmake_clang_format ament_cmake_clang_tidy test_msgs sqlite3 rosbag2_storage_mcap' && \
--skip-keys='ament_cmake_clang_format ament_cmake_clang_tidy test_msgs sqlite3 libcpp-httplib-dev rosbag2_storage_mcap' && \
colcon build --cmake-args -DBUILD_TESTING=OFF"

# ============================================================================
Expand Down
5 changes: 5 additions & 0 deletions src/ros2_medkit_gateway/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,11 @@ if(BUILD_TESTING)
target_link_libraries(test_plugin_loader gateway_lib)
medkit_target_dependencies(test_plugin_loader ament_index_cpp)

# Plugin config from YAML tests
ament_add_gtest(test_plugin_config test/test_plugin_config.cpp)
target_link_libraries(test_plugin_config gateway_lib)
medkit_set_test_domain(test_plugin_config)

# Plugin manager tests
ament_add_gtest(test_plugin_manager test/test_plugin_manager.cpp)
target_link_libraries(test_plugin_manager gateway_lib)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ class NativeTopicSampler {
/// Native JSON serializer for topic deserialization
std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_;

/// Dedicated callback group for sampling subscriptions. Isolates short-lived
/// sampling subscriptions from the main callback group so subscription
/// creation/destruction from httplib threads does not race with the executor
/// iterating the default group's entity list.
rclcpp::CallbackGroup::SharedPtr sampling_cb_group_;

/// Serializes subscription creation/destruction in sample_topic(). Only one
/// sampling subscription is active at a time, preventing concurrent
/// create_generic_subscription calls from multiple httplib threads.
std::mutex sampling_mutex_;

/// Cached component topic map, updated by build_component_topic_map()
std::map<std::string, ComponentTopics> topic_map_cache_;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2026 bburda
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>

#include <rcl/arguments.h>
#include <rcl_yaml_param_parser/parser.h>
#include <rclcpp/rclcpp.hpp>

namespace ros2_medkit_gateway {

/// Declare plugin config parameters from the global --params-file YAML.
///
/// Parameters from --params-file go into the ROS 2 global rcl context,
/// NOT into NodeOptions::parameter_overrides(). We must discover and
/// declare them explicitly so list_parameters()/get_parameter() can find them.
inline void declare_plugin_params_from_yaml(rclcpp::Node * node, const std::string & prefix,
const std::string & path_key = "") {
auto rcl_ctx = node->get_node_base_interface()->get_context()->get_rcl_context();
rcl_params_t * global_params = nullptr;
auto ret = rcl_arguments_get_param_overrides(&rcl_ctx->global_arguments, &global_params);
if (ret != RCL_RET_OK || global_params == nullptr) {
return;
}
auto cleanup = [](rcl_params_t * p) {
if (p) {
rcl_yaml_node_struct_fini(p);
}
};
std::unique_ptr<rcl_params_t, decltype(cleanup)> guard(global_params, cleanup);

std::string node_name = node->get_name();
std::string node_fqn = node->get_fully_qualified_name();
for (size_t n = 0; n < global_params->num_nodes; ++n) {
std::string yaml_node = global_params->node_names[n];
if (yaml_node != node_name && yaml_node != node_fqn && yaml_node != "/**") {
continue;
}
auto * node_p = &global_params->params[n];
for (size_t p = 0; p < node_p->num_params; ++p) {
std::string pname = node_p->parameter_names[p];
if (pname.rfind(prefix, 0) == 0 && pname != path_key && !node->has_parameter(pname)) {
auto & val = node_p->parameter_values[p];
try {
if (val.string_value != nullptr) {
node->declare_parameter(pname, std::string(val.string_value));
} else if (val.bool_value != nullptr) {
node->declare_parameter(pname, *val.bool_value);
} else if (val.integer_value != nullptr) {
node->declare_parameter(pname, static_cast<int64_t>(*val.integer_value));
} else if (val.double_value != nullptr) {
node->declare_parameter(pname, *val.double_value);
} else {
RCLCPP_WARN(node->get_logger(), "Skipping param '%s': unsupported type (array?)", pname.c_str());
}
} catch (const std::exception & e) {
RCLCPP_DEBUG(node->get_logger(), "Could not declare param '%s': %s", pname.c_str(), e.what());
}
}
}
}
}

} // namespace ros2_medkit_gateway
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ class TriggerManager {
/// Called by GatewayNode after both TriggerManager and LogManager are available.
void set_log_manager(LogManager * log_manager);

/// Resolves a resource_path to a full ROS 2 topic name for a given entity.
/// Returns the topic name, or empty string if unresolvable.
using ResolveTopicFn = std::function<std::string(const std::string & entity_id, const std::string & resource_path)>;

/// Set the topic name resolver. Called by GatewayNode after cache is available.
void set_resolve_topic_fn(ResolveTopicFn fn);

/// Retry resolving data triggers whose topic names were unknown at creation.
/// Called periodically by TriggerTopicSubscriber's retry timer.
void retry_unresolved_triggers();

// --- Entity existence check (for orphan sweep) ----------------------------

/// Function that checks whether an entity still exists in the discovery cache.
Expand Down Expand Up @@ -255,6 +266,18 @@ class TriggerManager {
// that would re-enter on_resource_change(). Only accessed from the notifier
// worker thread (single-threaded dispatch), so no synchronization needed.
bool evaluating_trigger_{false};

// Deferred resolution for data triggers created before topic was discoverable.
// Periodically retried via retry_unresolved_triggers().
struct UnresolvedTrigger {
std::string trigger_id;
std::string entity_id;
std::string resource_path;
std::chrono::steady_clock::time_point created_at;
};
std::vector<UnresolvedTrigger> unresolved_data_triggers_; // guarded by triggers_mutex_
ResolveTopicFn resolve_topic_fn_; // guarded by triggers_mutex_
static constexpr int kUnresolvedTimeoutSec = 60;
};

} // namespace ros2_medkit_gateway
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class TriggerTopicSubscriber {
*/
void shutdown();

/// Callback invoked by the retry timer alongside topic type resolution.
/// Used to let TriggerManager re-resolve triggers whose topic names
/// were unknown at creation time.
using RetryCallback = std::function<void()>;
void set_retry_callback(RetryCallback cb);

private:
/// Per-topic subscription state with multi-entity support.
struct SubscriptionEntry {
Expand Down Expand Up @@ -144,6 +150,7 @@ class TriggerTopicSubscriber {
std::unordered_map<std::string, SubscriptionEntry> subscriptions_;
std::unordered_map<std::string, PendingSubscription> pending_; ///< Topics awaiting type resolution
rclcpp::TimerBase::SharedPtr retry_timer_;
RetryCallback retry_callback_;
std::atomic<bool> shutdown_flag_{false};
};

Expand Down
51 changes: 44 additions & 7 deletions src/ros2_medkit_gateway/src/gateway_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <unordered_set>

#include "ros2_medkit_gateway/aggregation/network_utils.hpp"
#include "ros2_medkit_gateway/param_utils.hpp"

#include "ros2_medkit_gateway/http/handlers/sse_transport_provider.hpp"
#include "ros2_medkit_gateway/sqlite_trigger_store.hpp"
Expand Down Expand Up @@ -59,20 +60,36 @@ nlohmann::json parameter_to_json(const rclcpp::Parameter & param) {
}
}

/// Extract per-plugin config from YAML parameter overrides.
/// Extract per-plugin config from ROS 2 parameters.
/// Scans for keys matching "plugins.<name>.<key>" (excluding ".path")
/// and builds a flat JSON object: {"<key>": value, ...}
nlohmann::json extract_plugin_config(const std::vector<rclcpp::Parameter> & overrides,
const std::string & plugin_name) {
///
/// Checks two sources:
/// 1. NodeOptions::parameter_overrides (set programmatically, e.g. in unit tests)
/// 2. Global YAML overrides from --params-file (declared on-demand, production path)
nlohmann::json extract_plugin_config(rclcpp::Node * node, const std::string & plugin_name) {
auto config = nlohmann::json::object();
std::string prefix = "plugins." + plugin_name + ".";
std::string path_key = prefix + "path";

for (const auto & param : overrides) {
// Source 1: NodeOptions parameter_overrides (programmatic, used in tests)
for (const auto & param : node->get_node_options().parameter_overrides()) {
const auto & name = param.get_name();
if (name.rfind(prefix, 0) == 0 && name != path_key) {
auto key = name.substr(prefix.size());
config[key] = parameter_to_json(param);
config[name.substr(prefix.size())] = parameter_to_json(param);
}
}
if (!config.empty()) {
return config;
}

// Source 2: global YAML overrides from --params-file
declare_plugin_params_from_yaml(node, prefix, path_key);

auto result = node->list_parameters({prefix}, 10);
for (const auto & name : result.names) {
if (name != path_key) {
config[name.substr(prefix.size())] = parameter_to_json(node->get_parameter(name));
}
}
return config;
Expand Down Expand Up @@ -601,7 +618,7 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki
RCLCPP_ERROR(get_logger(), "Plugin '%s' has no path configured", pname.c_str());
continue;
}
auto plugin_config = extract_plugin_config(get_node_options().parameter_overrides(), pname);
auto plugin_config = extract_plugin_config(this, pname);
if (!plugin_config.empty()) {
RCLCPP_INFO(get_logger(), "Plugin '%s' config: %zu key(s)", pname.c_str(), plugin_config.size());
}
Expand Down Expand Up @@ -849,6 +866,26 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki
trigger_topic_subscriber_ = std::make_unique<TriggerTopicSubscriber>(this, *resource_change_notifier_);
trigger_mgr_->set_topic_subscriber(trigger_topic_subscriber_.get());

// Wire deferred topic resolution: when a trigger's resource_path couldn't
// be resolved to a ROS 2 topic at creation time, retry periodically using
// the entity cache to find matching topics.
trigger_mgr_->set_resolve_topic_fn([this](const std::string & entity_id,
const std::string & resource_path) -> std::string {
const auto & cache = get_thread_safe_cache();
auto aggregated = cache.get_entity_data(entity_id);
for (const auto & topic : aggregated.topics) {
if (topic.name == resource_path ||
(topic.name.size() > resource_path.size() &&
topic.name.compare(topic.name.size() - resource_path.size(), resource_path.size(), resource_path) == 0)) {
return topic.name;
}
}
return "";
});
trigger_topic_subscriber_->set_retry_callback([this]() {
trigger_mgr_->retry_unresolved_triggers();
});

// Wire node-to-entity resolver for trigger notifications (#305).
// The resolver looks up node FQNs in the ThreadSafeEntityCache's node_to_app
// mapping (populated from linking result). Captures 'this' - the lambda is
Expand Down
9 changes: 8 additions & 1 deletion src/ros2_medkit_gateway/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ int main(int argc, char ** argv) {

auto node = std::make_shared<ros2_medkit_gateway::GatewayNode>();

rclcpp::spin(node);
// MultiThreadedExecutor is required because cpp-httplib handler threads call
// Node::create_generic_subscription() for topic sampling. SingleThreadedExecutor
// does not synchronize external subscription creation with its internal iteration,
// causing non-deterministic SIGSEGV on rolling. All gateway callbacks are already
// protected by mutexes (EntityCache, LogManager, TriggerManager, etc.).
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();

rclcpp::shutdown();
return 0;
Expand Down
42 changes: 30 additions & 12 deletions src/ros2_medkit_gateway/src/native_topic_sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ NativeTopicSampler::NativeTopicSampler(rclcpp::Node * node)
if (!node_) {
throw std::invalid_argument("NativeTopicSampler requires a valid node pointer");
}
// Dedicated callback group isolates sampling subscriptions from the default
// group. This prevents subscription create/destroy from httplib threads
// racing with the executor iterating the default group's entity list.
sampling_cb_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
RCLCPP_INFO(node_->get_logger(), "NativeTopicSampler initialized with native serialization");
}

Expand Down Expand Up @@ -238,6 +242,11 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam
return result;
}

// Serialize sampling: only one subscription active at a time.
// This prevents concurrent create_generic_subscription calls from multiple
// httplib threads racing with each other and with the executor.
std::lock_guard<std::mutex> sampling_lock(sampling_mutex_);

// Use native GenericSubscription for sampling
try {
constexpr double kDefaultTimeoutSec = 1.0;
Expand All @@ -248,10 +257,16 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam
timeout_sec = kDefaultTimeoutSec;
}

// Set up one-shot subscription with promise/future
std::promise<rclcpp::SerializedMessage> message_promise;
auto message_future = message_promise.get_future();
std::atomic<bool> received{false};
// Heap-allocated state shared between the callback and this function.
// The callback captures a shared_ptr (not stack references) so it remains
// valid even if the subscription outlives sample_topic()'s stack frame -
// e.g. when the executor dispatches a callback after wait_for() times out.
struct SampleState {
std::promise<rclcpp::SerializedMessage> message_promise;
std::atomic<bool> received{false};
};
auto state = std::make_shared<SampleState>();
auto message_future = state->message_promise.get_future();

// Choose QoS based on publisher QoS (best effort for sensor data, reliable for others)
rclcpp::QoS qos = rclcpp::SensorDataQoS(); // Best effort, keep last 1
Expand All @@ -266,17 +281,19 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam

// Create generic subscription
// NOLINTNEXTLINE(performance-unnecessary-value-param) - GenericSubscription requires value type in callback
auto callback = [&message_promise, &received](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
auto callback = [state](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
// Only process first message (one-shot)
bool expected = false;
if (received.compare_exchange_strong(expected, true)) {
message_promise.set_value(*msg);
if (state->received.compare_exchange_strong(expected, true)) {
state->message_promise.set_value(*msg);
}
};

rclcpp::GenericSubscription::SharedPtr subscription;
try {
subscription = node_->create_generic_subscription(topic_name, info->type, qos, callback);
rclcpp::SubscriptionOptions sub_opts;
sub_opts.callback_group = sampling_cb_group_;
subscription = node_->create_generic_subscription(topic_name, info->type, qos, callback, sub_opts);
} catch (const std::exception & e) {
RCLCPP_WARN(node_->get_logger(), "Failed to create subscription for '%s': %s", topic_name.c_str(), e.what());
result.has_data = false;
Expand All @@ -285,10 +302,11 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam

RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Created GenericSubscription for '%s'", topic_name.c_str());

// Wait for message using future with timeout
// The main executor will deliver callbacks - we just wait for the result
// Note: This works because HTTP requests are handled in a separate thread,
// while the main executor processes ROS callbacks
// Wait for message using future with timeout.
// The main executor delivers callbacks on the executor thread; we block here.
// Thread safety: sampling_mutex_ serializes subscription lifecycle,
// sampling_cb_group_ isolates from the default callback group, and
// SampleState (heap-allocated, shared_ptr) keeps callback state alive.
const auto timeout = std::chrono::duration<double>(timeout_sec);
auto future_status = message_future.wait_for(timeout);

Expand Down
Loading
Loading