Conversation
Ros2 devel hunter config
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
|
@copilot in another consideration I developed this additional concept shown below. This complements your idea and in addition to lifecycle management and interrupting topics like cmd_vel it implements specific software guards for python and c++. Out together, we want to have a new package called Design ApproachThe key insight is to create a reusable Guard abstraction that monitors sentor's state and heartbeat, then implement it in multiple forms:
1. Python Guard Implementation# sentor_guard/guard.py
import rclpy
from rclpy.node import Node
from rclpy.time import Time, Duration
from std_msgs.msg import String, Header # Adjust based on sentor's actual messages
from threading import Event, Lock
import time
class Autonomy GuardException(Exception):
"""Raised when autonomy guard conditions are not met within timeout"""
pass
class SentorGuard:
"""
Guard that checks sentor state and heartbeat before allowing execution.
Can be used as a context manager or called directly.
"""
def __init__(self, node: Node,
state_topic: str = '/sentor/state',
heartbeat_topic: str = '/sentor/heartbeat',
heartbeat_timeout: float = 1.0,
required_state: str = 'autonomy'):
"""
Args:
node: ROS2 node to use for subscriptions
state_topic: Topic publishing sentor state
heartbeat_topic: Topic publishing sentor heartbeat
heartbeat_timeout: Maximum age of heartbeat in seconds
required_state: State required for autonomy
"""
self.node = node
self.heartbeat_timeout = Duration(seconds=heartbeat_timeout)
self.required_state = required_state
self._lock = Lock()
self._current_state = None
self._last_heartbeat_time = None
self._condition_met = Event()
# Subscribe to sentor topics
self._state_sub = node.create_subscription(
String, # Adjust to actual message type
state_topic,
self._state_callback,
10
)
self._heartbeat_sub = node.create_subscription(
Header, # Assuming heartbeat uses Header with timestamp
heartbeat_topic,
self._heartbeat_callback,
10
)
def _state_callback(self, msg):
with self._lock:
self._current_state = msg.data
self._check_conditions()
def _heartbeat_callback(self, msg):
with self._lock:
self._last_heartbeat_time = Time.from_msg(msg.stamp)
self._check_conditions()
def _check_conditions(self):
"""Check if both state and heartbeat conditions are met"""
if self._current_state != self.required_state:
self._condition_met.clear()
return
if self._last_heartbeat_time is None:
self._condition_met.clear()
return
now = self.node.get_clock().now()
age = now - self._last_heartbeat_time
if age > self.heartbeat_timeout:
self._condition_met.clear()
return
self._condition_met.set()
def is_autonomy_allowed(self) -> bool:
"""Check if autonomy is currently allowed (non-blocking)"""
with self._lock:
self._check_conditions() # Recheck heartbeat age
return self._condition_met.is_set()
def wait_for_autonomy(self, timeout: float = None) -> bool:
"""
Wait until autonomy is allowed.
Args:
timeout: Maximum time to wait in seconds. None for indefinite.
Returns:
True if autonomy is allowed, False if timeout occurred
"""
# Spin in background to process callbacks
start_time = time.time()
rate = self.node.create_rate(10) # 10 Hz check rate
while rclpy.ok():
if self.is_autonomy_allowed():
return True
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
return False
rclpy.spin_once(self.node, timeout_sec=0.1)
return False
def __enter__(self):
"""Context manager entry - waits indefinitely by default"""
if not self.wait_for_autonomy():
raise AutonomyGuardException("Autonomy not allowed and node is shutting down")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
return False
def guarded_wait(self, timeout: float = None):
"""
Wait for autonomy with timeout, raising exception on failure.
Args:
timeout: Maximum time to wait. None for indefinite.
Raises:
AutonomyGuardException: If timeout occurs
"""
if not self.wait_for_autonomy(timeout):
raise AutonomyGuardException(
f"Autonomy not granted within {timeout}s timeout"
)
# Example usage in a node
class ExampleRobotNode(Node):
def __init__(self):
super().__init__('example_robot')
self.guard = SentorGuard(self, heartbeat_timeout=1.0)
def do_autonomous_action(self):
# Method 1: Context manager (indefinite wait)
with self.guard:
self.get_logger().info("Executing autonomous action")
# Your autonomous code here
def do_timed_action(self):
# Method 2: Explicit timeout with exception
try:
self.guard.guarded_wait(timeout=5.0)
self.get_logger().info("Autonomy granted, proceeding")
# Your autonomous code here
except AutonomyGuardException as e:
self.get_logger().error(f"Cannot proceed: {e}")
def do_non_blocking_check(self):
# Method 3: Non-blocking check
if self.guard.is_autonomy_allowed():
# Proceed
pass
else:
# Handle not allowed
pass2. C++ Guard Implementation// sentor_guard/include/sentor_guard/guard.hpp
#pragma once
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <std_msgs/msg/header.hpp>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
namespace sentor_guard {
class AutonomyGuardException : public std::runtime_error {
public:
explicit AutonomyGuardException(const std::string& message)
: std::runtime_error(message) {}
};
class SentorGuard {
public:
SentorGuard(
rclcpp::Node::SharedPtr node,
const std::string& state_topic = "/sentor/state",
const std::string& heartbeat_topic = "/sentor/heartbeat",
double heartbeat_timeout = 1.0,
const std::string& required_state = "autonomy")
: node_(node),
heartbeat_timeout_(rclcpp::Duration::from_seconds(heartbeat_timeout)),
required_state_(required_state),
current_state_(""),
autonomy_allowed_(false)
{
state_sub_ = node_->create_subscription<std_msgs::msg::String>(
state_topic, 10,
std::bind(&SentorGuard::stateCallback, this, std::placeholders::_1));
heartbeat_sub_ = node_->create_subscription<std_msgs::msg::Header>(
heartbeat_topic, 10,
std::bind(&SentorGuard::heartbeatCallback, this, std::placeholders::_1));
}
bool isAutonomyAllowed() {
std::lock_guard<std::mutex> lock(mutex_);
checkConditions();
return autonomy_allowed_;
}
bool waitForAutonomy(double timeout_sec = -1.0) {
auto start = node_->get_clock()->now();
rclcpp::Duration timeout = timeout_sec < 0
? rclcpp::Duration::from_seconds(std::numeric_limits<double>::max())
: rclcpp::Duration::from_seconds(timeout_sec);
rclcpp::Rate rate(10); // 10 Hz check rate
while (rclcpp::ok()) {
if (isAutonomyAllowed()) {
return true;
}
if ((node_->get_clock()->now() - start) > timeout) {
return false;
}
rclcpp::spin_some(node_);
rate.sleep();
}
return false;
}
void guardedWait(double timeout_sec = -1.0) {
if (!waitForAutonomy(timeout_sec)) {
if (timeout_sec < 0) {
throw AutonomyGuardException("Node shutting down while waiting for autonomy");
} else {
throw AutonomyGuardException(
"Autonomy not granted within " + std::to_string(timeout_sec) + "s timeout");
}
}
}
// RAII wrapper for scoped guarding
class ScopedGuard {
public:
explicit ScopedGuard(SentorGuard& guard, double timeout = -1.0)
: guard_(guard) {
guard_.guardedWait(timeout);
}
~ScopedGuard() = default;
// Non-copyable
ScopedGuard(const ScopedGuard&) = delete;
ScopedGuard& operator=(const ScopedGuard&) = delete;
private:
SentorGuard& guard_;
};
ScopedGuard createScopedGuard(double timeout = -1.0) {
return ScopedGuard(*this, timeout);
}
private:
void stateCallback(const std_msgs::msg::String::SharedPtr msg) {
std::lock_guard<std::mutex> lock(mutex_);
current_state_ = msg->data;
checkConditions();
cv_.notify_all();
}
void heartbeatCallback(const std_msgs::msg::Header::SharedPtr msg) {
std::lock_guard<std::mutex> lock(mutex_);
last_heartbeat_time_ = rclcpp::Time(msg->stamp);
checkConditions();
cv_.notify_all();
}
void checkConditions() {
// Must be called with mutex locked
if (current_state_ != required_state_) {
autonomy_allowed_ = false;
return;
}
if (last_heartbeat_time_.nanoseconds() == 0) {
autonomy_allowed_ = false;
return;
}
auto now = node_->get_clock()->now();
auto age = now - last_heartbeat_time_;
if (age > heartbeat_timeout_) {
autonomy_allowed_ = false;
return;
}
autonomy_allowed_ = true;
}
rclcpp::Node::SharedPtr node_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr state_sub_;
rclcpp::Subscription<std_msgs::msg::Header>::SharedPtr heartbeat_sub_;
rclcpp::Duration heartbeat_timeout_;
std::string required_state_;
std::mutex mutex_;
std::condition_variable cv_;
std::string current_state_;
rclcpp::Time last_heartbeat_time_;
bool autonomy_allowed_;
};
// Example usage
class ExampleRobotNode : public rclcpp::Node {
public:
ExampleRobotNode() : Node("example_robot") {
guard_ = std::make_shared<SentorGuard>(
shared_from_this(),
"/sentor/state",
"/sentor/heartbeat",
1.0);
}
void doAutonomousAction() {
// Method 1: RAII scoped guard
auto scoped_guard = guard_->createScopedGuard(5.0); // 5s timeout
RCLCPP_INFO(get_logger(), "Executing autonomous action");
// Your code here
}
void doTimedAction() {
// Method 2: Explicit wait with exception handling
try {
guard_->guardedWait(5.0);
RCLCPP_INFO(get_logger(), "Autonomy granted");
// Your code here
} catch (const AutonomyGuardException& e) {
RCLCPP_ERROR(get_logger(), "Cannot proceed: %s", e.what());
}
}
private:
std::shared_ptr<SentorGuard> guard_;
};
} // namespace sentor_guard3. Topic Guard Node#!/usr/bin/env python3
# sentor_guard/topic_guard_node.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
import importlib
from sentor_guard.guard import SentorGuard
class TopicGuardNode(Node):
"""
Generic node that forwards messages from input_topic to output_topic
only when sentor guard conditions are met.
"""
def __init__(self):
super().__init__('topic_guard_node')
# Declare parameters
self.declare_parameter('input_topic', '/cmd_vel_unguarded')
self.declare_parameter('output_topic', '/cmd_vel')
self.declare_parameter('message_type', 'geometry_msgs/msg/Twist')
self.declare_parameter('heartbeat_timeout', 1.0)
self.declare_parameter('queue_size', 10)
self.declare_parameter('state_topic', '/sentor/state')
self.declare_parameter('heartbeat_topic', '/sentor/heartbeat')
self.declare_parameter('required_state', 'autonomy')
# Get parameters
input_topic = self.get_parameter('input_topic').value
output_topic = self.get_parameter('output_topic').value
msg_type_str = self.get_parameter('message_type').value
heartbeat_timeout = self.get_parameter('heartbeat_timeout').value
queue_size = self.get_parameter('queue_size').value
state_topic = self.get_parameter('state_topic').value
heartbeat_topic_param = self.get_parameter('heartbeat_topic').value
required_state = self.get_parameter('required_state').value
# Initialize guard
self.guard = SentorGuard(
self,
state_topic=state_topic,
heartbeat_topic=heartbeat_topic_param,
heartbeat_timeout=heartbeat_timeout,
required_state=required_state
)
# Dynamically import message type
# e.g., 'geometry_msgs/msg/Twist' -> geometry_msgs.msg.Twist
module_path, class_name = msg_type_str.rsplit('/', 1)
module_path = module_path.replace('/', '.')
module = importlib.import_module(module_path)
msg_class = getattr(module, class_name)
# Create publisher
self.publisher = self.create_publisher(
msg_class,
output_topic,
queue_size
)
# Create subscriber
self.subscription = self.create_subscription(
msg_class,
input_topic,
self.message_callback,
queue_size
)
self.get_logger().info(
f'Topic guard active: {input_topic} -> {output_topic} '
f'(type: {msg_type_str})'
)
# Statistics
self.msg_received = 0
self.msg_forwarded = 0
self.msg_blocked = 0
# Periodic stats reporting
self.create_timer(10.0, self.report_stats)
def message_callback(self, msg):
"""Forward message only if guard conditions are met"""
self.msg_received += 1
if self.guard.is_autonomy_allowed():
self.publisher.publish(msg)
self.msg_forwarded += 1
else:
self.msg_blocked += 1
if self.msg_blocked % 10 == 1: # Log every 10th blocked message
self.get_logger().warn(
'Blocking message - autonomy not allowed',
throttle_duration_sec=1.0
)
def report_stats(self):
"""Periodically report statistics"""
self.get_logger().info(
f'Stats - Received: {self.msg_received}, '
f'Forwarded: {self.msg_forwarded}, '
f'Blocked: {self.msg_blocked}'
)
def main(args=None):
rclpy.init(args=args)
node = TopicGuardNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()4. Launch File Example# launch/topic_guard.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
return LaunchDescription([
DeclareLaunchArgument(
'input_topic',
default_value='/cmd_vel_unguarded',
description='Input topic to monitor'
),
DeclareLaunchArgument(
'output_topic',
default_value='/cmd_vel',
description='Output topic to publish to when guarded'
),
Node(
package='sentor_guard',
executable='topic_guard_node',
name='cmd_vel_guard',
parameters=[{
'input_topic': LaunchConfiguration('input_topic'),
'output_topic': LaunchConfiguration('output_topic'),
'message_type': 'geometry_msgs/msg/Twist',
'heartbeat_timeout': 1.0,
'queue_size': 10,
'state_topic': '/sentor/state',
'heartbeat_topic': '/sentor/heartbeat',
'required_state': 'autonomy'
}],
output='screen'
)
])5. Package StructureKey Design Decisions
Usage Recommendations
|
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
I've incorporated your guard concept into the architecture. Created comprehensive Three safety mechanisms:
Key features:
Integration:
See docs/SENTOR_GUARD_DESIGN.md for full specification with complete implementations. Updated main architecture and summary docs to reference sentor_guard throughout. Commit: 8aec75a |
|
@copilot based on the now developed concept, make a prototype implementation of the sentor_guard package. |
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Prototype implementation complete. Created full sentor_guard package with: Python Library (
C++ Library (
Nodes:
Complete Package:
Ready to build with: colcon build --packages-select sentor_guardSee Commit: 5652f05 |
|
@copilot add to the python guard the ability to use an function annotation (like |
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
…tions handling, and fixing compile error
- Created CheckAutonomyAllowed BT condition node - Added BT plugin infrastructure with optional BehaviorTree.CPP dependency - Created comprehensive Nav2 integration examples - Added tests for BT condition node - Updated documentation Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
- Created test_bt_integration.py to demonstrate condition checking - Added simple_guard_demo.py showing guard usage in application code - Updated CMakeLists.txt to install demo scripts - Enhanced Nav2 examples README with testing documentation Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
- Created detailed integration documentation - Covers architecture, installation, usage, and troubleshooting - Includes advanced patterns and best practices - Provides complete testing procedures Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
…gement; modify package.xml to ensure behavior tree integration
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
…nstall_package Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
Co-authored-by: marc-hanheide <1153084+marc-hanheide@users.noreply.github.com>
…/github.com/LCAS/sentor into copilot/develop-concept-architecture-sentor
Task: Create Concept Architecture Documentation for Sentor-ROS2-Nav2 Integration
✅ GUARDSTATUS MESSAGE COMPLETE
Previous Deliverables:
1-12. Architecture documents + sentor_guard prototype + simplified architecture + BT fixes + blocking diagnostics + message fixes - see previous commits
Bug Fix (Code):
13. GuardStatus.msg File - NEW Added message definition file
Added the
msg/GuardStatus.msgfile that was missing from the previous commit.Message Definition:
This message provides detailed blocking diagnostics for monitoring guard behavior across the system.
<issue_title>Develop Concept Architecture for integration of new Sentor statemachine with ROS2.</issue_title>
Original prompt
💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.