From 54a8377cfe68973d08cdb472c87c191023a720d2 Mon Sep 17 00:00:00 2001 From: Koki Shinjo Date: Wed, 4 Mar 2026 22:05:36 +0900 Subject: [PATCH] Fix event handler leak in ExecuteLocal by unregistering handlers on cleanup ExecuteLocal registers 6 event handlers in execute() but never unregisters them when the process exits. This causes a memory leak proportional to the number of processes launched over the lifetime of a LaunchService, as the handlers and all objects they reference remain in context._event_handlers indefinitely. This change: - Stores registered event handlers and context as instance variables - Adds __unregister_event_handlers() helper that safely removes all handlers registered by this action - Unregisters handlers in __flush_buffers/__flush_cached_buffers after all ProcessExited handlers (including on_exit) have fired - Unregisters handlers when process fails to start (no ProcessExited) - Fixes _shutdown_process() deferred handler accumulation by tracking and replacing the OnProcessStart handler instead of creating new ones - Adds tests verifying handler cleanup and stability during respawn Fixes https://github.com/ros2/launch/issues/565 Signed-off-by: Koki Shinjo --- launch/launch/actions/execute_local.py | 56 +++++++++++++-- launch/test/launch/test_execute_local.py | 92 ++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) diff --git a/launch/launch/actions/execute_local.py b/launch/launch/actions/execute_local.py index 107d86c30..2789f4d12 100644 --- a/launch/launch/actions/execute_local.py +++ b/launch/launch/actions/execute_local.py @@ -44,6 +44,7 @@ from ..conditions import evaluate_condition_expression from ..descriptions import Executable from ..event import Event +from ..event_handler import BaseEventHandler from ..event_handler import EventHandler from ..event_handlers import OnProcessExit from ..event_handlers import OnProcessIO @@ -223,6 +224,9 @@ def __init__( self.__stderr_buffer = io.StringIO() self.__executed = False + self.__registered_event_handlers: List[BaseEventHandler] = [] + self.__context: Optional[LaunchContext] = None + self.__deferred_shutdown_handler: Optional[BaseEventHandler] = None @property def process_description(self) -> Executable: @@ -283,10 +287,16 @@ def _shutdown_process(self, context: LaunchContext, *, send_sigint: bool # Defer shut down if the process is scheduled to be started if (self.process_details is None or self._subprocess_transport is None): # Do not set shutdown result, as event is postponed - context.register_event_handler( - OnProcessStart( - on_start=lambda event, context: - self._shutdown_process(context, send_sigint=send_sigint))) + # Remove previously registered deferred shutdown handler to avoid accumulation + if self.__deferred_shutdown_handler is not None: + try: + context.unregister_event_handler(self.__deferred_shutdown_handler) + except ValueError: + pass + self.__deferred_shutdown_handler = OnProcessStart( + on_start=lambda event, context: + self._shutdown_process(context, send_sigint=send_sigint)) + context.register_event_handler(self.__deferred_shutdown_handler) return None self.__shutdown_future.set_result(None) @@ -413,6 +423,13 @@ def __flush_buffers(self, event, context): self.__stderr_buffer.seek(0) self.__stderr_buffer.truncate(0) + # Unregister event handlers if the action is complete (not respawning). + # This is done here rather than in __cleanup() because __cleanup() runs + # before the ProcessExited event is processed from the queue, and + # unregistering there would prevent on_exit callbacks from firing. + if self.__completed_future is not None and self.__completed_future.done(): + self.__unregister_event_handlers() + def __on_process_output_cached( self, event: ProcessIO, buffer, logger ) -> None: @@ -442,6 +459,10 @@ def __flush_cached_buffers(self, event, context): self.__output_format.format(line=line, this=self) ) + # Unregister event handlers if the action is complete (not respawning). + if self.__completed_future is not None and self.__completed_future.done(): + self.__unregister_event_handlers() + def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeEntitiesType]: due_to_sigint = cast(Shutdown, event).due_to_sigint return self._shutdown_process( @@ -504,6 +525,24 @@ def __get_sigint_event(self) -> EmitEvent: process_matcher=matches_action(self), )) + def __unregister_event_handlers(self) -> None: + """Unregister all event handlers registered by this action.""" + context = self.__context + if context is None: + return + for event_handler in self.__registered_event_handlers: + try: + context.unregister_event_handler(event_handler) + except ValueError: + pass + self.__registered_event_handlers.clear() + if self.__deferred_shutdown_handler is not None: + try: + context.unregister_event_handler(self.__deferred_shutdown_handler) + except ValueError: + pass + self.__deferred_shutdown_handler = None + def __cleanup(self) -> None: # Cancel any pending timers we started. if self.__sigterm_timer is not None: @@ -514,6 +553,8 @@ def __cleanup(self) -> None: if self._subprocess_transport is not None: self._subprocess_transport.close() # Signal that we're done to the launch system. + # Event handlers are unregistered in __flush_buffers/__flush_cached_buffers + # after all ProcessExited handlers (including on_exit) have had a chance to fire. if self.__completed_future is not None: self.__completed_future.set_result(None) @@ -583,6 +624,8 @@ async def __execute_process(self, context: LaunchContext) -> None: self.__logger.error('exception occurred while executing process:\n{}'.format( traceback.format_exc() )) + # No ProcessExited event will be emitted, so unregister handlers directly. + self.__unregister_event_handlers() self.__cleanup() return @@ -703,6 +746,8 @@ def execute(self, context: LaunchContext) -> None: ] for event_handler in event_handlers: context.register_event_handler(event_handler) + self.__registered_event_handlers = list(event_handlers) + self.__context = context try: self.__completed_future = context.asyncio_loop.create_future() @@ -720,8 +765,7 @@ def execute(self, context: LaunchContext) -> None: launch.logging.get_output_loggers(name, self.__output) context.asyncio_loop.create_task(self.__execute_process(context)) except Exception: - for event_handler in event_handlers: - context.unregister_event_handler(event_handler) + self.__unregister_event_handlers() raise return None diff --git a/launch/test/launch/test_execute_local.py b/launch/test/launch/test_execute_local.py index af515630c..ca0ea5994 100644 --- a/launch/test/launch/test_execute_local.py +++ b/launch/test/launch/test_execute_local.py @@ -178,3 +178,95 @@ def test_execute_process_with_output_dictionary(): ls = LaunchService() ls.include_launch_description(ld) assert 0 == ls.run() + + +def test_event_handlers_cleaned_up_after_process_exit(): + """Test that event handlers are unregistered after a process exits.""" + ls = LaunchService() + initial_handler_count = len(ls.context._event_handlers) + + executable = ExecuteLocal( + process_description=Executable( + cmd=[sys.executable, '-c', "print('hello')"] + ), + output='screen' + ) + ld = LaunchDescription([executable]) + ls.include_launch_description(ld) + assert 0 == ls.run() + + final_handler_count = len(ls.context._event_handlers) + assert final_handler_count == initial_handler_count, ( + f'Expected {initial_handler_count} handlers after cleanup, ' + f'but found {final_handler_count}' + ) + + +def test_event_handlers_stable_during_respawn(): + """Test that event handler count stays stable across respawn cycles.""" + handler_counts = [] + + def on_exit_callback(event, context): + handler_counts.append(len(context._event_handlers)) + + shutdown_time = 4.0 + respawn_delay = 1.0 + + executable = ExecuteLocal( + process_description=Executable( + cmd=[sys.executable, '-c', "print('respawn test')"] + ), + respawn=True, + respawn_delay=respawn_delay, + on_exit=on_exit_callback, + output='screen' + ) + + ls = LaunchService() + + ld = LaunchDescription([ + executable, + TimerAction( + period=shutdown_time, + actions=[ + Shutdown(reason='Timer expired') + ] + ) + ]) + ls.include_launch_description(ld) + assert 0 == ls.run() + + # Handler count should remain stable during respawn (not growing) + assert len(handler_counts) >= 2, ( + f'Expected at least 2 process exits, got {len(handler_counts)}' + ) + assert all(c == handler_counts[0] for c in handler_counts), ( + f'Handler counts should be stable across respawns, got: {handler_counts}' + ) + + +def test_multiple_processes_handler_cleanup(): + """Test that multiple processes don't leak event handlers.""" + ls = LaunchService() + initial_handler_count = len(ls.context._event_handlers) + + executables = [ + ExecuteLocal( + process_description=Executable( + cmd=[sys.executable, '-c', f"print('process {i}')"] + ), + output='screen' + ) + for i in range(5) + ] + + ld = LaunchDescription(executables) + ls.include_launch_description(ld) + assert 0 == ls.run() + + final_handler_count = len(ls.context._event_handlers) + assert final_handler_count == initial_handler_count, ( + f'Expected {initial_handler_count} handlers after cleanup, ' + f'but found {final_handler_count}. ' + f'Leaked {final_handler_count - initial_handler_count} handlers from 5 processes.' + )