From 4c9655779ec72fba6d65b3467b548fd4b9903e13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 19 Sep 2025 09:44:21 +0200 Subject: [PATCH] scheduler.py: Replace asyncio child watcher with our own watcher thread The asyncio child watcher system was deprecated in Python 3.12 and will be fully removed in Python 3.14. We use it only to watch a single process (buildbox-casd) and can use a simple watcher thread for that. Fixes #1990. --- src/buildstream/_scheduler/scheduler.py | 38 +++++++++++-------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index f2ae736a2..3172fce8b 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -25,6 +25,7 @@ import datetime import multiprocessing.forkserver import sys +import threading from concurrent.futures import ThreadPoolExecutor # Local imports @@ -138,12 +139,6 @@ def run(self, queues, casd_process_manager): # Hold on to the queues to process self.queues = queues - # NOTE: Enforce use of `SafeChildWatcher` as we generally don't want - # background threads. - # In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and - # not `SafeChildWatcher`. - asyncio.set_child_watcher(asyncio.SafeChildWatcher()) # pylint: disable=deprecated-class - # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -160,12 +155,7 @@ def run(self, queues, casd_process_manager): # Watch casd while running to ensure it doesn't die self._casd_process = casd_process_manager.process - _watcher = asyncio.get_child_watcher() - - def abort_casd(pid, returncode): - asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode) - - _watcher.add_child_handler(self._casd_process.pid, abort_casd) + threading.Thread(target=self._watch_casd, name="watch-casd", daemon=True).start() # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): @@ -185,8 +175,6 @@ def abort_casd(pid, returncode): # Invoke the ticker callback a final time to render pending messages self._ticker_callback() - # Stop watching casd - _watcher.remove_child_handler(self._casd_process.pid) self._casd_process = None # Stop handling unix signals @@ -309,16 +297,24 @@ def job_completed(self, job, status): # This will terminate immediately all jobs, since buildbox-casd is dead, # we can't do anything with them anymore. # - # Args: - # pid (int): the process id under which buildbox-casd was running - # returncode (int): the return code with which buildbox-casd exited - # - def _abort_on_casd_failure(self, pid, returncode): + def _abort_on_casd_failure(self): self.context.messenger.bug("buildbox-casd died while the pipeline was active.") - - self._casd_process.returncode = returncode self.terminate() + # _watch_casd() + # + # This runs in a separate thread to detect casd exiting while the loop is + # still running. + # + def _watch_casd(self): + loop = self.loop + proc = self._casd_process + if loop and proc: + # This sets the `returncode` attribute + proc.wait() + if not loop.is_closed(): + loop.call_soon_threadsafe(self._abort_on_casd_failure) + # _start_job() # # Spanws a job