From 0b36f640bd27466ff1eddc806098d50323d394b5 Mon Sep 17 00:00:00 2001 From: Brian Kroth Date: Mon, 28 Apr 2025 21:38:49 +0000 Subject: [PATCH] Delay enter trial runner context until the trial is actually being run. This is part of an attempt to try and see if can work around issues with `multiprocessing.Pool` needing to pickle certain objects when forking. For instance, if the Environment is using an SshServer, we need to start an EventLoopContext in the background to handle the SSH connections and threads are not picklable. Nor are file handles, DB connections, etc., so there may be other things we also need to adjust to make this work. See Also #967 --- mlos_bench/mlos_bench/schedulers/base_scheduler.py | 11 +++++++---- mlos_bench/mlos_bench/schedulers/sync_scheduler.py | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index eaa5527c6d6..b711388609d 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -200,8 +200,9 @@ def __enter__(self) -> "Scheduler": _LOG.debug("Scheduler START :: %s", self) assert self.experiment is None assert not self._in_context - for trial_runner in self._trial_runners.values(): - trial_runner.__enter__() + # NOTE: We delay entering the context of trial_runners until it's time + # to run the trial in order to avoid incompatibilities with + # multiprocessing.Pool. self._optimizer.__enter__() # Start new or resume the existing experiment. Verify that the # experiment configuration is compatible with the previous runs. @@ -235,7 +236,8 @@ def __exit__( self._experiment.__exit__(ex_type, ex_val, ex_tb) self._optimizer.__exit__(ex_type, ex_val, ex_tb) for trial_runner in self._trial_runners.values(): - trial_runner.__exit__(ex_type, ex_val, ex_tb) + # TrialRunners should have already exited their context after running the Trial. + assert not trial_runner._in_context # pylint: disable=protected-access self._experiment = None self._in_context = False return False # Do not suppress exceptions @@ -267,7 +269,8 @@ def teardown(self) -> None: if self._do_teardown: for trial_runner in self._trial_runners.values(): assert not trial_runner.is_running - trial_runner.teardown() + with trial_runner: + trial_runner.teardown() def get_best_observation(self) -> tuple[dict[str, float] | None, TunableGroups | None]: """Get the best observation from the optimizer.""" diff --git a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py index 4b864942dce..f450b28b8f1 100644 --- a/mlos_bench/mlos_bench/schedulers/sync_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/sync_scheduler.py @@ -39,5 +39,6 @@ def run_trial(self, trial: Storage.Trial) -> None: super().run_trial(trial) # In the sync scheduler we run each trial on its own TrialRunner in sequence. trial_runner = self.get_trial_runner(trial) - trial_runner.run_trial(trial, self.global_config) - _LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner) + with trial_runner: + trial_runner.run_trial(trial, self.global_config) + _LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner)