From a6efef580633f4a4f8506f55b7a59d55e1bdbf14 Mon Sep 17 00:00:00 2001 From: Tomohito Ando Date: Wed, 27 Aug 2025 15:12:35 +0900 Subject: [PATCH 1/6] feat(load_composable_nodes): retry if no response after waiting 30 seconds when loading node Signed-off-by: Tomohito Ando --- .../actions/load_composable_nodes.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index e16e3928..2716e6b6 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -16,6 +16,7 @@ from pathlib import Path import threading +import time from typing import List from typing import Optional @@ -184,6 +185,68 @@ def unblock(future): ) ) + response_future = self.__rclpy_load_node_client.call_async(request) + response_future.add_done_callback(unblock) + attempt_started = time.monotonic() + # maximum wait time per attempt (seconds) + timeout_sec = 30.0 + + while not event.wait(1.0): + if context.is_shutdown: + self.__logger.warning( + "Abandoning wait for the '{}' service response, due to shutdown.".format( + self.__rclpy_load_node_client.srv_name), + ) + response_future.cancel() + return + + # Resend if no response for 10s + if (time.monotonic() - attempt_started) >= timeout_sec: + self.__logger.warning( + "No response from '{}' for {}s; resending service call.".format( + self.__rclpy_load_node_client.srv_name, timeout_sec + ) + ) + response_future.cancel() + + # Reset event and unblock callback + event = threading.Event() + + response_future = self.__rclpy_load_node_client.call_async(request) + response_future.add_done_callback(unblock) + attempt_started = time.monotonic() + + # Get response + if response_future.exception() is not None: + raise response_future.exception() + response = response_future.result() + + self.__logger.debug("Received response '{}'".format(response)) + + node_name = response.full_node_name if response.full_node_name else request.node_name + if response.success: + if node_name is not None: + add_node_name(context, node_name) + node_name_count = get_node_name_count(context, node_name) + if node_name_count > 1: + container_logger = launch.logging.get_logger( + self.__final_target_container_name + ) + container_logger.warning( + 'there are now at least {} nodes with the name {} created within this ' + 'launch context'.format(node_name_count, node_name) + ) + self.__logger.info("Loaded node '{}' in container '{}'".format( + response.full_node_name, self.__final_target_container_name + )) + else: + self.__logger.error( + "Failed to load node '{}' of type '{}' in container '{}': {}".format( + node_name, request.plugin_name, self.__final_target_container_name, + response.error_message + ) + ) + def _load_in_sequence( self, load_node_requests: List[composition_interfaces.srv.LoadNode.Request], From 4331252e161b9e4bdf42a86aaf72c40bc673eb6e Mon Sep 17 00:00:00 2001 From: Tomohito Ando Date: Mon, 1 Sep 2025 16:48:06 +0900 Subject: [PATCH 2/6] chore: fix comments Signed-off-by: Tomohito Ando --- launch_ros/launch_ros/actions/load_composable_nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index 2716e6b6..ad33d3ed 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -200,7 +200,7 @@ def unblock(future): response_future.cancel() return - # Resend if no response for 10s + # Resend if no response for timeout_sec if (time.monotonic() - attempt_started) >= timeout_sec: self.__logger.warning( "No response from '{}' for {}s; resending service call.".format( From 1b119d9c4387e726460712914e992f1234005441 Mon Sep 17 00:00:00 2001 From: Tomohito Ando Date: Tue, 2 Sep 2025 19:15:00 +0900 Subject: [PATCH 3/6] refactor: extract resend logic into separate method for better modularity Signed-off-by: Tomohito Ando --- .../actions/load_composable_nodes.py | 60 ++++++++++++++----- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index ad33d3ed..018ea4e9 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -109,6 +109,49 @@ def parse(cls, entity: Entity, parser: Parser): return cls, kwargs + def _resend_service_call_if_timeout( + self, + response_future, + event: threading.Event, + attempt_started: float, + timeout_sec: float, + request: composition_interfaces.srv.LoadNode.Request, + context: LaunchContext + ) -> tuple: + """ + Resend service call if timeout occurred. + + :param response_future: current response future + :param event: threading event for synchronization + :param attempt_started: timestamp when attempt started + :param timeout_sec: timeout duration in seconds + :param request: service request to resend + :param context: current launch context + :return: tuple of (new_response_future, new_event, new_attempt_started) + """ + if (time.monotonic() - attempt_started) >= timeout_sec: + self.__logger.warning( + "No response from '{}' for {}s; resending service call.".format( + self.__rclpy_load_node_client.srv_name, timeout_sec + ) + ) + response_future.cancel() + + # Reset event and unblock callback + new_event = threading.Event() + + def unblock(future): + nonlocal new_event + new_event.set() + + new_response_future = self.__rclpy_load_node_client.call_async(request) + new_response_future.add_done_callback(unblock) + new_attempt_started = time.monotonic() + + return new_response_future, new_event, new_attempt_started + + return response_future, event, attempt_started + def _load_node( self, request: composition_interfaces.srv.LoadNode.Request, @@ -201,20 +244,9 @@ def unblock(future): return # Resend if no response for timeout_sec - if (time.monotonic() - attempt_started) >= timeout_sec: - self.__logger.warning( - "No response from '{}' for {}s; resending service call.".format( - self.__rclpy_load_node_client.srv_name, timeout_sec - ) - ) - response_future.cancel() - - # Reset event and unblock callback - event = threading.Event() - - response_future = self.__rclpy_load_node_client.call_async(request) - response_future.add_done_callback(unblock) - attempt_started = time.monotonic() + response_future, event, attempt_started = self._resend_service_call_if_timeout( + response_future, event, attempt_started, timeout_sec, request, context + ) # Get response if response_future.exception() is not None: From b81b5d8c2b973f9a48105f58a87d93af20a07165 Mon Sep 17 00:00:00 2001 From: Tomohito Ando Date: Tue, 2 Sep 2025 19:33:30 +0900 Subject: [PATCH 4/6] add retry logic Signed-off-by: Tomohito Ando --- .../actions/load_composable_nodes.py | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index 018ea4e9..044095dd 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -116,7 +116,9 @@ def _resend_service_call_if_timeout( attempt_started: float, timeout_sec: float, request: composition_interfaces.srv.LoadNode.Request, - context: LaunchContext + context: LaunchContext, + retry_count: int = 0, + max_retries: int = 10 ) -> tuple: """ Resend service call if timeout occurred. @@ -127,12 +129,25 @@ def _resend_service_call_if_timeout( :param timeout_sec: timeout duration in seconds :param request: service request to resend :param context: current launch context - :return: tuple of (new_response_future, new_event, new_attempt_started) + :param retry_count: current retry attempt count + :param max_retries: maximum number of retry attempts + :return: tuple of (new_response_future, new_event, new_attempt_started, new_retry_count) """ if (time.monotonic() - attempt_started) >= timeout_sec: + if retry_count >= max_retries: + self.__logger.error( + "Maximum retry attempts ({}) exceeded for '{}' service. Giving up.".format( + max_retries, self.__rclpy_load_node_client.srv_name + ) + ) + raise RuntimeError( + "Failed to load node after {} retry attempts".format(max_retries) + ) + self.__logger.warning( - "No response from '{}' for {}s; resending service call.".format( - self.__rclpy_load_node_client.srv_name, timeout_sec + "No response from '{}' for {}s; resending service call (attempt {}/{}).".format( + self.__rclpy_load_node_client.srv_name, timeout_sec, + retry_count + 1, max_retries ) ) response_future.cancel() @@ -147,10 +162,11 @@ def unblock(future): new_response_future = self.__rclpy_load_node_client.call_async(request) new_response_future.add_done_callback(unblock) new_attempt_started = time.monotonic() + new_retry_count = retry_count + 1 - return new_response_future, new_event, new_attempt_started + return new_response_future, new_event, new_attempt_started, new_retry_count - return response_future, event, attempt_started + return response_future, event, attempt_started, retry_count def _load_node( self, @@ -233,6 +249,7 @@ def unblock(future): attempt_started = time.monotonic() # maximum wait time per attempt (seconds) timeout_sec = 30.0 + retry_count = 0 while not event.wait(1.0): if context.is_shutdown: @@ -244,8 +261,8 @@ def unblock(future): return # Resend if no response for timeout_sec - response_future, event, attempt_started = self._resend_service_call_if_timeout( - response_future, event, attempt_started, timeout_sec, request, context + response_future, event, attempt_started, retry_count = self._resend_service_call_if_timeout( + response_future, event, attempt_started, timeout_sec, request, context, retry_count ) # Get response From 3e8023113ae2fa5f33a07d2ac51e8873be90e228 Mon Sep 17 00:00:00 2001 From: Kento Yabuuchi Date: Thu, 12 Mar 2026 14:45:14 +0900 Subject: [PATCH 5/6] fix conflict Signed-off-by: Kento Yabuuchi --- .../actions/load_composable_nodes.py | 61 +++---------------- 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index 044095dd..d474a8f4 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -203,6 +203,10 @@ def unblock(future): response_future = self.__rclpy_load_node_client.call_async(request) response_future.add_done_callback(unblock) + attempt_started = time.monotonic() + # maximum wait time per attempt (seconds) + timeout_sec = 30.0 + retry_count = 0 while not event.wait(1.0): if context.is_shutdown: @@ -213,6 +217,11 @@ def unblock(future): response_future.cancel() return + # Resend if no response for timeout_sec + response_future, event, attempt_started, retry_count = self._resend_service_call_if_timeout( + response_future, event, attempt_started, timeout_sec, request, context, retry_count + ) + # Get response if response_future.exception() is not None: raise response_future.exception() @@ -244,58 +253,6 @@ def unblock(future): ) ) - response_future = self.__rclpy_load_node_client.call_async(request) - response_future.add_done_callback(unblock) - attempt_started = time.monotonic() - # maximum wait time per attempt (seconds) - timeout_sec = 30.0 - retry_count = 0 - - while not event.wait(1.0): - if context.is_shutdown: - self.__logger.warning( - "Abandoning wait for the '{}' service response, due to shutdown.".format( - self.__rclpy_load_node_client.srv_name), - ) - response_future.cancel() - return - - # Resend if no response for timeout_sec - response_future, event, attempt_started, retry_count = self._resend_service_call_if_timeout( - response_future, event, attempt_started, timeout_sec, request, context, retry_count - ) - - # Get response - if response_future.exception() is not None: - raise response_future.exception() - response = response_future.result() - - self.__logger.debug("Received response '{}'".format(response)) - - node_name = response.full_node_name if response.full_node_name else request.node_name - if response.success: - if node_name is not None: - add_node_name(context, node_name) - node_name_count = get_node_name_count(context, node_name) - if node_name_count > 1: - container_logger = launch.logging.get_logger( - self.__final_target_container_name - ) - container_logger.warning( - 'there are now at least {} nodes with the name {} created within this ' - 'launch context'.format(node_name_count, node_name) - ) - self.__logger.info("Loaded node '{}' in container '{}'".format( - response.full_node_name, self.__final_target_container_name - )) - else: - self.__logger.error( - "Failed to load node '{}' of type '{}' in container '{}': {}".format( - node_name, request.plugin_name, self.__final_target_container_name, - response.error_message - ) - ) - def _load_in_sequence( self, load_node_requests: List[composition_interfaces.srv.LoadNode.Request], From c706ef7c20768114d94731c1ca185cc2a25c467d Mon Sep 17 00:00:00 2001 From: YuxuanLiuTier4Desktop <619684051@qq.com> Date: Fri, 6 Mar 2026 18:14:26 +0900 Subject: [PATCH 6/6] feat: improve logging for load composable node failure Signed-off-by: YuxuanLiuTier4Desktop <619684051@qq.com> Signed-off-by: Kento Yabuuchi --- .../actions/load_composable_nodes.py | 24 ++- .../actions/test_load_composable_nodes.py | 152 ++++++++++++++++++ 2 files changed, 170 insertions(+), 6 deletions(-) diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index d474a8f4..cc9764a1 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -134,20 +134,32 @@ def _resend_service_call_if_timeout( :return: tuple of (new_response_future, new_event, new_attempt_started, new_retry_count) """ if (time.monotonic() - attempt_started) >= timeout_sec: + node_ident = request.node_name if request.node_name else '' if retry_count >= max_retries: self.__logger.error( - "Maximum retry attempts ({}) exceeded for '{}' service. Giving up.".format( - max_retries, self.__rclpy_load_node_client.srv_name + "Maximum retry attempts ({}) exceeded when loading node '{}' " + "(package='{}', plugin='{}') in container '{}'. Giving up.".format( + max_retries, node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name ) ) raise RuntimeError( - "Failed to load node after {} retry attempts".format(max_retries) + "Failed to load composable node '{}' (package='{}', plugin='{}') " + "in container '{}' after {} retry attempts".format( + node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name, max_retries + ) ) + service_ready = self.__rclpy_load_node_client.service_is_ready() self.__logger.warning( - "No response from '{}' for {}s; resending service call (attempt {}/{}).".format( - self.__rclpy_load_node_client.srv_name, timeout_sec, - retry_count + 1, max_retries + "No response from '{}' for {}s when loading node '{}' " + "(package='{}', plugin='{}') in container '{}'; " + "resending service call (attempt {}/{}, service available: {}).".format( + self.__rclpy_load_node_client.srv_name, timeout_sec, + node_ident, request.package_name, request.plugin_name, + self.__final_target_container_name, + retry_count + 1, max_retries, service_ready ) ) response_future.cancel() diff --git a/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py b/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py index 74c4b494..ba5b15b6 100644 --- a/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py +++ b/test_launch_ros/test/test_launch_ros/actions/test_load_composable_nodes.py @@ -16,6 +16,8 @@ import pathlib import threading +from unittest.mock import MagicMock +from unittest.mock import patch from composition_interfaces.srv import LoadNode @@ -23,6 +25,7 @@ from launch import LaunchService from launch.actions import GroupAction from launch.conditions import IfCondition +from launch.launch_context import LaunchContext from launch_ros.actions import LoadComposableNodes from launch_ros.actions import PushROSNamespace from launch_ros.actions import SetRemap @@ -662,3 +665,152 @@ def test_load_node_with_condition_in_group(mock_component_container): assert len(request.remap_rules) == 0 assert len(request.parameters) == 0 assert len(request.extra_arguments) == 0 + +def test_load_node_timeout_max_retries_error_message(): + """Test that RuntimeError on max retries includes node identity for debugging.""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='pointcloud_preprocessor', + plugin='pointcloud_preprocessor::RingOutlierFilterComponent', + name='ring_outlier_filter', + namespace='sensing', + ) + ] + ) + + # Set private attributes required by _resend_service_call_if_timeout + mock_client = MagicMock() + mock_client.srv_name = 'pointcloud_container/_container/load_node' + mock_client.service_is_ready.return_value = True + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'pointcloud_container' + + request = LoadNode.Request() + request.package_name = 'pointcloud_preprocessor' + request.plugin_name = 'pointcloud_preprocessor::RingOutlierFilterComponent' + request.node_name = 'ring_outlier_filter' + request.node_namespace = '/sensing' + + mock_future = MagicMock() + mock_event = threading.Event() + context = LaunchContext() + + with patch('time.monotonic', return_value=100.0): + with pytest.raises(RuntimeError) as exc_info: + action._resend_service_call_if_timeout( + response_future=mock_future, + event=mock_event, + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=context, + retry_count=10, + max_retries=10, + ) + + err_msg = str(exc_info.value) + assert 'ring_outlier_filter' in err_msg + assert 'pointcloud_preprocessor' in err_msg + assert 'RingOutlierFilterComponent' in err_msg + assert 'pointcloud_container' in err_msg + assert '10 retry attempts' in err_msg + + +def test_load_node_timeout_max_retries_error_message_unnamed_node(): + """Test RuntimeError message when node_name is empty (unnamed node).""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='my_pkg', + plugin='my_plugin::MyComponent', + ) + ] + ) + + mock_client = MagicMock() + mock_client.srv_name = 'container/_container/load_node' + mock_client.service_is_ready.return_value = False + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'container' + + request = LoadNode.Request() + request.package_name = 'my_pkg' + request.plugin_name = 'my_plugin::MyComponent' + request.node_name = '' + request.node_namespace = '/' + + with patch('time.monotonic', return_value=100.0): + with pytest.raises(RuntimeError) as exc_info: + action._resend_service_call_if_timeout( + response_future=MagicMock(), + event=threading.Event(), + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=LaunchContext(), + retry_count=10, + max_retries=10, + ) + + err_msg = str(exc_info.value) + assert '' in err_msg + assert 'my_pkg' in err_msg + assert 'MyComponent' in err_msg + assert 'container' in err_msg + + +def test_load_node_timeout_retry_warning_message(): + """Test that retry path calls service_is_ready and resends the request.""" + action = LoadComposableNodes( + target_container=f'/{TEST_CONTAINER_NAME}', + composable_node_descriptions=[ + ComposableNode( + package='foo_package', + plugin='bar_plugin', + name='test_node_name', + ) + ] + ) + + mock_client = MagicMock() + mock_client.srv_name = 'mock_component_container/_container/load_node' + mock_client.service_is_ready.return_value = False + mock_future = MagicMock() + mock_client.call_async.return_value = mock_future + action._LoadComposableNodes__rclpy_load_node_client = mock_client + action._LoadComposableNodes__final_target_container_name = 'mock_component_container' + + request = LoadNode.Request() + request.package_name = 'foo_package' + request.plugin_name = 'bar_plugin' + request.node_name = 'test_node_name' + request.node_namespace = '/' + + mock_response_future = MagicMock() + + with patch('time.monotonic', return_value=100.0): + result = action._resend_service_call_if_timeout( + response_future=mock_response_future, + event=threading.Event(), + attempt_started=0.0, + timeout_sec=30.0, + request=request, + context=LaunchContext(), + retry_count=0, + max_retries=10, + ) + + # Should have retried (returned new future, event, etc.) + assert result[3] == 1 # new retry_count + + # Verify service_is_ready was called for diagnostic logging + mock_client.service_is_ready.assert_called_once() + + # Verify call_async was invoked to resend the request + mock_client.call_async.assert_called_once_with(request) + + # Verify original future was cancelled + mock_response_future.cancel.assert_called_once()