Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions launch_ros/launch_ros/actions/load_composable_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from pathlib import Path
import threading
import time

from typing import List
from typing import Optional
Expand Down Expand Up @@ -108,6 +109,77 @@ def parse(cls, entity: Entity, parser: Parser):

return cls, kwargs

def _resend_service_call_if_timeout(
self,
response_future,
event: threading.Event,
attempt_started: float,
timeout_sec: float,
request: composition_interfaces.srv.LoadNode.Request,
context: LaunchContext,
retry_count: int = 0,
max_retries: int = 10
) -> tuple:
"""
Resend service call if timeout occurred.

:param response_future: current response future
:param event: threading event for synchronization
:param attempt_started: timestamp when attempt started
:param timeout_sec: timeout duration in seconds
:param request: service request to resend
:param context: current launch context
:param retry_count: current retry attempt count
:param max_retries: maximum number of retry attempts
:return: tuple of (new_response_future, new_event, new_attempt_started, new_retry_count)
"""
if (time.monotonic() - attempt_started) >= timeout_sec:
node_ident = request.node_name if request.node_name else '<unnamed>'
if retry_count >= max_retries:
self.__logger.error(
"Maximum retry attempts ({}) exceeded when loading node '{}' "
"(package='{}', plugin='{}') in container '{}'. Giving up.".format(
max_retries, node_ident, request.package_name, request.plugin_name,
self.__final_target_container_name
)
)
raise RuntimeError(
"Failed to load composable node '{}' (package='{}', plugin='{}') "
"in container '{}' after {} retry attempts".format(
node_ident, request.package_name, request.plugin_name,
self.__final_target_container_name, max_retries
)
)

service_ready = self.__rclpy_load_node_client.service_is_ready()
self.__logger.warning(
"No response from '{}' for {}s when loading node '{}' "
"(package='{}', plugin='{}') in container '{}'; "
"resending service call (attempt {}/{}, service available: {}).".format(
self.__rclpy_load_node_client.srv_name, timeout_sec,
node_ident, request.package_name, request.plugin_name,
self.__final_target_container_name,
retry_count + 1, max_retries, service_ready
)
)
response_future.cancel()

# Reset event and unblock callback
new_event = threading.Event()

def unblock(future):
nonlocal new_event
new_event.set()

new_response_future = self.__rclpy_load_node_client.call_async(request)
new_response_future.add_done_callback(unblock)
new_attempt_started = time.monotonic()
new_retry_count = retry_count + 1

return new_response_future, new_event, new_attempt_started, new_retry_count

return response_future, event, attempt_started, retry_count

def _load_node(
self,
request: composition_interfaces.srv.LoadNode.Request,
Expand Down Expand Up @@ -143,6 +215,10 @@ def unblock(future):

response_future = self.__rclpy_load_node_client.call_async(request)
response_future.add_done_callback(unblock)
attempt_started = time.monotonic()
# maximum wait time per attempt (seconds)
timeout_sec = 30.0
retry_count = 0

while not event.wait(1.0):
if context.is_shutdown:
Expand All @@ -153,6 +229,11 @@ def unblock(future):
response_future.cancel()
return

# Resend if no response for timeout_sec
response_future, event, attempt_started, retry_count = self._resend_service_call_if_timeout(
response_future, event, attempt_started, timeout_sec, request, context, retry_count
)

# Get response
if response_future.exception() is not None:
raise response_future.exception()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

import pathlib
import threading
from unittest.mock import MagicMock
from unittest.mock import patch

from composition_interfaces.srv import LoadNode

from launch import LaunchDescription
from launch import LaunchService
from launch.actions import GroupAction
from launch.conditions import IfCondition
from launch.launch_context import LaunchContext
from launch_ros.actions import LoadComposableNodes
from launch_ros.actions import PushROSNamespace
from launch_ros.actions import SetRemap
Expand Down Expand Up @@ -662,3 +665,152 @@ def test_load_node_with_condition_in_group(mock_component_container):
assert len(request.remap_rules) == 0
assert len(request.parameters) == 0
assert len(request.extra_arguments) == 0

def test_load_node_timeout_max_retries_error_message():
"""Test that RuntimeError on max retries includes node identity for debugging."""
action = LoadComposableNodes(
target_container=f'/{TEST_CONTAINER_NAME}',
composable_node_descriptions=[
ComposableNode(
package='pointcloud_preprocessor',
plugin='pointcloud_preprocessor::RingOutlierFilterComponent',
name='ring_outlier_filter',
namespace='sensing',
)
]
)

# Set private attributes required by _resend_service_call_if_timeout
mock_client = MagicMock()
mock_client.srv_name = 'pointcloud_container/_container/load_node'
mock_client.service_is_ready.return_value = True
action._LoadComposableNodes__rclpy_load_node_client = mock_client
action._LoadComposableNodes__final_target_container_name = 'pointcloud_container'

request = LoadNode.Request()
request.package_name = 'pointcloud_preprocessor'
request.plugin_name = 'pointcloud_preprocessor::RingOutlierFilterComponent'
request.node_name = 'ring_outlier_filter'
request.node_namespace = '/sensing'

mock_future = MagicMock()
mock_event = threading.Event()
context = LaunchContext()

with patch('time.monotonic', return_value=100.0):
with pytest.raises(RuntimeError) as exc_info:
action._resend_service_call_if_timeout(
response_future=mock_future,
event=mock_event,
attempt_started=0.0,
timeout_sec=30.0,
request=request,
context=context,
retry_count=10,
max_retries=10,
)

err_msg = str(exc_info.value)
assert 'ring_outlier_filter' in err_msg
assert 'pointcloud_preprocessor' in err_msg
assert 'RingOutlierFilterComponent' in err_msg
assert 'pointcloud_container' in err_msg
assert '10 retry attempts' in err_msg


def test_load_node_timeout_max_retries_error_message_unnamed_node():
"""Test RuntimeError message when node_name is empty (unnamed node)."""
action = LoadComposableNodes(
target_container=f'/{TEST_CONTAINER_NAME}',
composable_node_descriptions=[
ComposableNode(
package='my_pkg',
plugin='my_plugin::MyComponent',
)
]
)

mock_client = MagicMock()
mock_client.srv_name = 'container/_container/load_node'
mock_client.service_is_ready.return_value = False
action._LoadComposableNodes__rclpy_load_node_client = mock_client
action._LoadComposableNodes__final_target_container_name = 'container'

request = LoadNode.Request()
request.package_name = 'my_pkg'
request.plugin_name = 'my_plugin::MyComponent'
request.node_name = ''
request.node_namespace = '/'

with patch('time.monotonic', return_value=100.0):
with pytest.raises(RuntimeError) as exc_info:
action._resend_service_call_if_timeout(
response_future=MagicMock(),
event=threading.Event(),
attempt_started=0.0,
timeout_sec=30.0,
request=request,
context=LaunchContext(),
retry_count=10,
max_retries=10,
)

err_msg = str(exc_info.value)
assert '<unnamed>' in err_msg
assert 'my_pkg' in err_msg
assert 'MyComponent' in err_msg
assert 'container' in err_msg


def test_load_node_timeout_retry_warning_message():
"""Test that retry path calls service_is_ready and resends the request."""
action = LoadComposableNodes(
target_container=f'/{TEST_CONTAINER_NAME}',
composable_node_descriptions=[
ComposableNode(
package='foo_package',
plugin='bar_plugin',
name='test_node_name',
)
]
)

mock_client = MagicMock()
mock_client.srv_name = 'mock_component_container/_container/load_node'
mock_client.service_is_ready.return_value = False
mock_future = MagicMock()
mock_client.call_async.return_value = mock_future
action._LoadComposableNodes__rclpy_load_node_client = mock_client
action._LoadComposableNodes__final_target_container_name = 'mock_component_container'

request = LoadNode.Request()
request.package_name = 'foo_package'
request.plugin_name = 'bar_plugin'
request.node_name = 'test_node_name'
request.node_namespace = '/'

mock_response_future = MagicMock()

with patch('time.monotonic', return_value=100.0):
result = action._resend_service_call_if_timeout(
response_future=mock_response_future,
event=threading.Event(),
attempt_started=0.0,
timeout_sec=30.0,
request=request,
context=LaunchContext(),
retry_count=0,
max_retries=10,
)

# Should have retried (returned new future, event, etc.)
assert result[3] == 1 # new retry_count

# Verify service_is_ready was called for diagnostic logging
mock_client.service_is_ready.assert_called_once()

# Verify call_async was invoked to resend the request
mock_client.call_async.assert_called_once_with(request)

# Verify original future was cancelled
mock_response_future.cancel.assert_called_once()