Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions launch/launch/actions/execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from ..conditions import evaluate_condition_expression
from ..descriptions import Executable
from ..event import Event
from ..event_handler import BaseEventHandler
from ..event_handler import EventHandler
from ..event_handlers import OnProcessExit
from ..event_handlers import OnProcessIO
Expand Down Expand Up @@ -223,6 +224,9 @@ def __init__(
self.__stderr_buffer = io.StringIO()

self.__executed = False
self.__registered_event_handlers: List[BaseEventHandler] = []
self.__context: Optional[LaunchContext] = None
self.__deferred_shutdown_handler: Optional[BaseEventHandler] = None

@property
def process_description(self) -> Executable:
Expand Down Expand Up @@ -283,10 +287,16 @@ def _shutdown_process(self, context: LaunchContext, *, send_sigint: bool
# Defer shut down if the process is scheduled to be started
if (self.process_details is None or self._subprocess_transport is None):
# Do not set shutdown result, as event is postponed
context.register_event_handler(
OnProcessStart(
on_start=lambda event, context:
self._shutdown_process(context, send_sigint=send_sigint)))
# Remove previously registered deferred shutdown handler to avoid accumulation
if self.__deferred_shutdown_handler is not None:
try:
context.unregister_event_handler(self.__deferred_shutdown_handler)
except ValueError:
pass
self.__deferred_shutdown_handler = OnProcessStart(
on_start=lambda event, context:
self._shutdown_process(context, send_sigint=send_sigint))
context.register_event_handler(self.__deferred_shutdown_handler)
return None

self.__shutdown_future.set_result(None)
Expand Down Expand Up @@ -413,6 +423,13 @@ def __flush_buffers(self, event, context):
self.__stderr_buffer.seek(0)
self.__stderr_buffer.truncate(0)

# Unregister event handlers if the action is complete (not respawning).
# This is done here rather than in __cleanup() because __cleanup() runs
# before the ProcessExited event is processed from the queue, and
# unregistering there would prevent on_exit callbacks from firing.
if self.__completed_future is not None and self.__completed_future.done():
self.__unregister_event_handlers()

def __on_process_output_cached(
self, event: ProcessIO, buffer, logger
) -> None:
Expand Down Expand Up @@ -442,6 +459,10 @@ def __flush_cached_buffers(self, event, context):
self.__output_format.format(line=line, this=self)
)

# Unregister event handlers if the action is complete (not respawning).
if self.__completed_future is not None and self.__completed_future.done():
self.__unregister_event_handlers()

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeEntitiesType]:
due_to_sigint = cast(Shutdown, event).due_to_sigint
return self._shutdown_process(
Expand Down Expand Up @@ -504,6 +525,24 @@ def __get_sigint_event(self) -> EmitEvent:
process_matcher=matches_action(self),
))

def __unregister_event_handlers(self) -> None:
"""Unregister all event handlers registered by this action."""
context = self.__context
if context is None:
return
for event_handler in self.__registered_event_handlers:
try:
context.unregister_event_handler(event_handler)
except ValueError:
pass
self.__registered_event_handlers.clear()
if self.__deferred_shutdown_handler is not None:
try:
context.unregister_event_handler(self.__deferred_shutdown_handler)
except ValueError:
pass
self.__deferred_shutdown_handler = None

def __cleanup(self) -> None:
# Cancel any pending timers we started.
if self.__sigterm_timer is not None:
Expand All @@ -514,6 +553,8 @@ def __cleanup(self) -> None:
if self._subprocess_transport is not None:
self._subprocess_transport.close()
# Signal that we're done to the launch system.
# Event handlers are unregistered in __flush_buffers/__flush_cached_buffers
# after all ProcessExited handlers (including on_exit) have had a chance to fire.
if self.__completed_future is not None:
self.__completed_future.set_result(None)

Expand Down Expand Up @@ -583,6 +624,8 @@ async def __execute_process(self, context: LaunchContext) -> None:
self.__logger.error('exception occurred while executing process:\n{}'.format(
traceback.format_exc()
))
# No ProcessExited event will be emitted, so unregister handlers directly.
self.__unregister_event_handlers()
self.__cleanup()
return

Expand Down Expand Up @@ -703,6 +746,8 @@ def execute(self, context: LaunchContext) -> None:
]
for event_handler in event_handlers:
context.register_event_handler(event_handler)
self.__registered_event_handlers = list(event_handlers)
self.__context = context

try:
self.__completed_future = context.asyncio_loop.create_future()
Expand All @@ -720,8 +765,7 @@ def execute(self, context: LaunchContext) -> None:
launch.logging.get_output_loggers(name, self.__output)
context.asyncio_loop.create_task(self.__execute_process(context))
except Exception:
for event_handler in event_handlers:
context.unregister_event_handler(event_handler)
self.__unregister_event_handlers()
raise
return None

Expand Down
92 changes: 92 additions & 0 deletions launch/test/launch/test_execute_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,95 @@ def test_execute_process_with_output_dictionary():
ls = LaunchService()
ls.include_launch_description(ld)
assert 0 == ls.run()


def test_event_handlers_cleaned_up_after_process_exit():
"""Test that event handlers are unregistered after a process exits."""
ls = LaunchService()
initial_handler_count = len(ls.context._event_handlers)

executable = ExecuteLocal(
process_description=Executable(
cmd=[sys.executable, '-c', "print('hello')"]
),
output='screen'
)
ld = LaunchDescription([executable])
ls.include_launch_description(ld)
assert 0 == ls.run()

final_handler_count = len(ls.context._event_handlers)
assert final_handler_count == initial_handler_count, (
f'Expected {initial_handler_count} handlers after cleanup, '
f'but found {final_handler_count}'
)


def test_event_handlers_stable_during_respawn():
"""Test that event handler count stays stable across respawn cycles."""
handler_counts = []

def on_exit_callback(event, context):
handler_counts.append(len(context._event_handlers))

shutdown_time = 4.0
respawn_delay = 1.0

executable = ExecuteLocal(
process_description=Executable(
cmd=[sys.executable, '-c', "print('respawn test')"]
),
respawn=True,
respawn_delay=respawn_delay,
on_exit=on_exit_callback,
output='screen'
)

ls = LaunchService()

ld = LaunchDescription([
executable,
TimerAction(
period=shutdown_time,
actions=[
Shutdown(reason='Timer expired')
]
)
])
ls.include_launch_description(ld)
assert 0 == ls.run()

# Handler count should remain stable during respawn (not growing)
assert len(handler_counts) >= 2, (
f'Expected at least 2 process exits, got {len(handler_counts)}'
)
assert all(c == handler_counts[0] for c in handler_counts), (
f'Handler counts should be stable across respawns, got: {handler_counts}'
)


def test_multiple_processes_handler_cleanup():
"""Test that multiple processes don't leak event handlers."""
ls = LaunchService()
initial_handler_count = len(ls.context._event_handlers)

executables = [
ExecuteLocal(
process_description=Executable(
cmd=[sys.executable, '-c', f"print('process {i}')"]
),
output='screen'
)
for i in range(5)
]

ld = LaunchDescription(executables)
ls.include_launch_description(ld)
assert 0 == ls.run()

final_handler_count = len(ls.context._event_handlers)
assert final_handler_count == initial_handler_count, (
f'Expected {initial_handler_count} handlers after cleanup, '
f'but found {final_handler_count}. '
f'Leaked {final_handler_count - initial_handler_count} handlers from 5 processes.'
)