diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index e16e3928..cc9764a1 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -16,6 +16,7 @@ from pathlib import Path import threading +import time from typing import List from typing import Optional @@ -108,6 +109,77 @@ def parse(cls, entity: Entity, parser: Parser): return cls, kwargs + def _resend_service_call_if_timeout( + self, + response_future, + event: threading.Event, + attempt_started: float, + timeout_sec: float, + request: composition_interfaces.srv.LoadNode.Request, + context: LaunchContext, + retry_count: int = 0, + max_retries: int = 10 + ) -> tuple: + """ + Resend service call if timeout occurred. + + :param response_future: current response future + :param event: threading event for synchronization + :param attempt_started: timestamp when attempt started + :param timeout_sec: timeout duration in seconds + :param request: service request to resend + :param context: current launch context + :param retry_count: current retry attempt count + :param max_retries: maximum number of retry attempts + :return: tuple of (new_response_future, new_event, new_attempt_started, new_retry_count) + """ + if (time.monotonic() - attempt_started) >= timeout_sec: + node_ident = request.node_name if request.node_name else '' + if retry_count >= max_retries: + self.__logger.error( + "Maximum retry attempts ({}) exceeded when loading node '{}' " + "(package='{}', plugin='{}') in container '{}'. Giving up.".format( + max_retries, node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name + ) + ) + raise RuntimeError( + "Failed to load composable node '{}' (package='{}', plugin='{}') " + "in container '{}' after {} retry attempts".format( + node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name, max_retries + ) + ) + + service_ready = self.__rclpy_load_node_client.service_is_ready() + self.__logger.warning( + "No response from '{}' for {}s when loading node '{}' " + "(package='{}', plugin='{}') in container '{}'; " + "resending service call (attempt {}/{}, service available: {}).".format( + self.__rclpy_load_node_client.srv_name, timeout_sec, + node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name, + retry_count + 1, max_retries, service_ready + ) + ) + response_future.cancel() + + # Reset event and unblock callback + new_event = threading.Event() + + def unblock(future): + nonlocal new_event + new_event.set() + + new_response_future = self.__rclpy_load_node_client.call_async(request) + new_response_future.add_done_callback(unblock) + new_attempt_started = time.monotonic() + new_retry_count = retry_count + 1 + + return new_response_future, new_event, new_attempt_started, new_retry_count + + return response_future, event, attempt_started, retry_count + def _load_node( self, request: composition_interfaces.srv.LoadNode.Request, @@ -143,6 +215,10 @@ def unblock(future): response_future = self.__rclpy_load_node_client.call_async(request) response_future.add_done_callback(unblock) + attempt_started = time.monotonic() + # maximum wait time per attempt (seconds) + timeout_sec = 30.0 + retry_count = 0 while not event.wait(1.0): if context.is_shutdown: @@ -153,6 +229,11 @@ def unblock(future): response_future.cancel() return + # Resend if no response for timeout_sec + response_future, event, attempt_started, retry_count = self._resend_service_call_if_timeout( + response_future, event, attempt_started, timeout_sec, request, context, retry_count + ) + # Get response if response_future.exception() is not None: raise response_future.exception() diff --git a/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py b/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py index 74c4b494..ba5b15b6 100644 --- a/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py +++ b/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py @@ -16,6 +16,8 @@ import pathlib import threading +from unittest.mock import MagicMock +from unittest.mock import patch from composition_interfaces.srv import LoadNode @@ -23,6 +25,7 @@ from launch import LaunchService from launch.actions import GroupAction from launch.conditions import IfCondition +from launch.launch_context import LaunchContext from launch_ros.actions import LoadComposableNodes from launch_ros.actions import PushROSNamespace from launch_ros.actions import SetRemap @@ -662,3 +665,152 @@ def test_load_node_with_condition_in_group(mock_component_container): assert len(request.remap_rules) == 0 assert len(request.parameters) == 0 assert len(request.extra_arguments) == 0 + +def test_load_node_timeout_max_retries_error_message(): + """Test that RuntimeError on max retries includes node identity for debugging.""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='pointcloud_preprocessor', + plugin='pointcloud_preprocessor::RingOutlierFilterComponent', + name='ring_outlier_filter', + namespace='sensing', + ) + ] + ) + + # Set private attributes required by _resend_service_call_if_timeout + mock_client = MagicMock() + mock_client.srv_name = 'pointcloud_container/_container/load_node' + mock_client.service_is_ready.return_value = True + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'pointcloud_container' + + request = LoadNode.Request() + request.package_name = 'pointcloud_preprocessor' + request.plugin_name = 'pointcloud_preprocessor::RingOutlierFilterComponent' + request.node_name = 'ring_outlier_filter' + request.node_namespace = '/sensing' + + mock_future = MagicMock() + mock_event = threading.Event() + context = LaunchContext() + + with patch('time.monotonic', return_value=100.0): + with pytest.raises(RuntimeError) as exc_info: + action._resend_service_call_if_timeout( + response_future=mock_future, + event=mock_event, + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=context, + retry_count=10, + max_retries=10, + ) + + err_msg = str(exc_info.value) + assert 'ring_outlier_filter' in err_msg + assert 'pointcloud_preprocessor' in err_msg + assert 'RingOutlierFilterComponent' in err_msg + assert 'pointcloud_container' in err_msg + assert '10 retry attempts' in err_msg + + +def test_load_node_timeout_max_retries_error_message_unnamed_node(): + """Test RuntimeError message when node_name is empty (unnamed node).""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='my_pkg', + plugin='my_plugin::MyComponent', + ) + ] + ) + + mock_client = MagicMock() + mock_client.srv_name = 'container/_container/load_node' + mock_client.service_is_ready.return_value = False + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'container' + + request = LoadNode.Request() + request.package_name = 'my_pkg' + request.plugin_name = 'my_plugin::MyComponent' + request.node_name = '' + request.node_namespace = '/' + + with patch('time.monotonic', return_value=100.0): + with pytest.raises(RuntimeError) as exc_info: + action._resend_service_call_if_timeout( + response_future=MagicMock(), + event=threading.Event(), + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=LaunchContext(), + retry_count=10, + max_retries=10, + ) + + err_msg = str(exc_info.value) + assert '' in err_msg + assert 'my_pkg' in err_msg + assert 'MyComponent' in err_msg + assert 'container' in err_msg + + +def test_load_node_timeout_retry_warning_message(): + """Test that retry path calls service_is_ready and resends the request.""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='foo_package', + plugin='bar_plugin', + name='test_node_name', + ) + ] + ) + + mock_client = MagicMock() + mock_client.srv_name = 'mock_component_container/_container/load_node' + mock_client.service_is_ready.return_value = False + mock_future = MagicMock() + mock_client.call_async.return_value = mock_future + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'mock_component_container' + + request = LoadNode.Request() + request.package_name = 'foo_package' + request.plugin_name = 'bar_plugin' + request.node_name = 'test_node_name' + request.node_namespace = '/' + + mock_response_future = MagicMock() + + with patch('time.monotonic', return_value=100.0): + result = action._resend_service_call_if_timeout( + response_future=mock_response_future, + event=threading.Event(), + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=LaunchContext(), + retry_count=0, + max_retries=10, + ) + + # Should have retried (returned new future, event, etc.) + assert result[3] == 1 # new retry_count + + # Verify service_is_ready was called for diagnostic logging + mock_client.service_is_ready.assert_called_once() + + # Verify call_async was invoked to resend the request + mock_client.call_async.assert_called_once_with(request) + + # Verify original future was cancelled + mock_response_future.cancel.assert_called_once()