diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index f2ae736a2..3172fce8b 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -25,6 +25,7 @@ import datetime import multiprocessing.forkserver import sys +import threading from concurrent.futures import ThreadPoolExecutor # Local imports @@ -138,12 +139,6 @@ def run(self, queues, casd_process_manager): # Hold on to the queues to process self.queues = queues - # NOTE: Enforce use of `SafeChildWatcher` as we generally don't want - # background threads. - # In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and - # not `SafeChildWatcher`. - asyncio.set_child_watcher(asyncio.SafeChildWatcher()) # pylint: disable=deprecated-class - # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -160,12 +155,7 @@ def run(self, queues, casd_process_manager): # Watch casd while running to ensure it doesn't die self._casd_process = casd_process_manager.process - _watcher = asyncio.get_child_watcher() - - def abort_casd(pid, returncode): - asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode) - - _watcher.add_child_handler(self._casd_process.pid, abort_casd) + threading.Thread(target=self._watch_casd, name="watch-casd", daemon=True).start() # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): @@ -185,8 +175,6 @@ def abort_casd(pid, returncode): # Invoke the ticker callback a final time to render pending messages self._ticker_callback() - # Stop watching casd - _watcher.remove_child_handler(self._casd_process.pid) self._casd_process = None # Stop handling unix signals @@ -309,16 +297,24 @@ def job_completed(self, job, status): # This will terminate immediately all jobs, since buildbox-casd is dead, # we can't do anything with them anymore. # - # Args: - # pid (int): the process id under which buildbox-casd was running - # returncode (int): the return code with which buildbox-casd exited - # - def _abort_on_casd_failure(self, pid, returncode): + def _abort_on_casd_failure(self): self.context.messenger.bug("buildbox-casd died while the pipeline was active.") - - self._casd_process.returncode = returncode self.terminate() + # _watch_casd() + # + # This runs in a separate thread to detect casd exiting while the loop is + # still running. + # + def _watch_casd(self): + loop = self.loop + proc = self._casd_process + if loop and proc: + # This sets the `returncode` attribute + proc.wait() + if not loop.is_closed(): + loop.call_soon_threadsafe(self._abort_on_casd_failure) + # _start_job() # # Spanws a job