Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions src/buildstream/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import datetime
import multiprocessing.forkserver
import sys
import threading
from concurrent.futures import ThreadPoolExecutor

# Local imports
Expand Down Expand Up @@ -138,12 +139,6 @@ def run(self, queues, casd_process_manager):
# Hold on to the queues to process
self.queues = queues

# NOTE: Enforce use of `SafeChildWatcher` as we generally don't want
# background threads.
# In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and
# not `SafeChildWatcher`.
asyncio.set_child_watcher(asyncio.SafeChildWatcher()) # pylint: disable=deprecated-class

# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
Expand All @@ -160,12 +155,7 @@ def run(self, queues, casd_process_manager):

# Watch casd while running to ensure it doesn't die
self._casd_process = casd_process_manager.process
_watcher = asyncio.get_child_watcher()

def abort_casd(pid, returncode):
asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode)

_watcher.add_child_handler(self._casd_process.pid, abort_casd)
threading.Thread(target=self._watch_casd, name="watch-casd", daemon=True).start()

# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
Expand All @@ -185,8 +175,6 @@ def abort_casd(pid, returncode):
# Invoke the ticker callback a final time to render pending messages
self._ticker_callback()

# Stop watching casd
_watcher.remove_child_handler(self._casd_process.pid)
self._casd_process = None

# Stop handling unix signals
Expand Down Expand Up @@ -309,16 +297,24 @@ def job_completed(self, job, status):
# This will terminate immediately all jobs, since buildbox-casd is dead,
# we can't do anything with them anymore.
#
# Args:
# pid (int): the process id under which buildbox-casd was running
# returncode (int): the return code with which buildbox-casd exited
#
def _abort_on_casd_failure(self, pid, returncode):
def _abort_on_casd_failure(self):
self.context.messenger.bug("buildbox-casd died while the pipeline was active.")

self._casd_process.returncode = returncode
self.terminate()

# _watch_casd()
#
# This runs in a separate thread to detect casd exiting while the loop is
# still running.
#
def _watch_casd(self):
loop = self.loop
proc = self._casd_process
if loop and proc:
# This sets the `returncode` attribute
proc.wait()
if not loop.is_closed():
loop.call_soon_threadsafe(self._abort_on_casd_failure)

# _start_job()
#
# Spanws a job
Expand Down
Loading