From 8b6e172845dd6fffdce37ab62c38cb730c540047 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Wed, 23 Apr 2025 10:39:14 -0500 Subject: [PATCH 01/14] add parallel schedular --- .../schedulers/parallel_scheduler.jsonc | 12 ++ .../schemas/schedulers/scheduler-schema.json | 39 +++-- mlos_bench/mlos_bench/schedulers/__init__.py | 2 + .../schedulers/parallel_scheduler.py | 145 ++++++++++++++++++ .../mlos_bench/schedulers/trial_runner.py | 85 ++++++++-- .../invalid/parallel_sched-bad-repeat.jsonc | 6 + .../invalid/parallel_sched-empty-config.jsonc | 5 + .../bad/unhandled/parallel_sched-extra.jsonc | 6 + .../good/full/parallel_sched-full.jsonc | 12 ++ .../good/partial/parallel_sched-partial.jsonc | 7 + .../tests/launcher_parse_args_test.py | 64 +++++++- .../mlos_bench/tests/storage/conftest.py | 2 + .../mlos_bench/tests/storage/sql/fixtures.py | 133 +++++++++++++++- .../tests/storage/trial_schedule_test.py | 31 +++- 14 files changed, 517 insertions(+), 32 deletions(-) create mode 100644 mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc create mode 100644 mlos_bench/mlos_bench/schedulers/parallel_scheduler.py create mode 100644 mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc create mode 100644 mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc create mode 100644 mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc create mode 100644 mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc create mode 100644 mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc diff --git a/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc b/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc new file mode 100644 index 00000000000..4d6b7a7e272 --- /dev/null +++ b/mlos_bench/mlos_bench/config/schedulers/parallel_scheduler.jsonc @@ -0,0 +1,12 @@ +// Mock optimizer to test the benchmarking framework. +{ + "$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", + + "class": "mlos_bench.schedulers.ParallelScheduler", + + "config": { + "trial_config_repeat_count": 3, + "max_trials": -1, // Limited only in the Optimizer logic/config. + "teardown": false + } +} diff --git a/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json b/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json index 81b2e797547..dedac1ed758 100644 --- a/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json +++ b/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json @@ -2,12 +2,10 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", "title": "mlos_bench Scheduler config", - "$defs": { "comment": { "$comment": "This section contains reusable partial schema bits (or just split out for readability)" }, - "config_base_scheduler": { "$comment": "config properties common to all Scheduler types.", "type": "object", @@ -29,18 +27,23 @@ "description": "Max. number of trials to run. Use -1 or 0 for unlimited.", "type": "integer", "minimum": -1, - "examples": [50, -1] + "examples": [ + 50, + -1 + ] }, "trial_config_repeat_count": { "description": "Number of times to repeat a config.", "type": "integer", "minimum": 1, - "examples": [3, 5] + "examples": [ + 3, + 5 + ] } } } }, - "description": "config for the mlos_bench scheduler", "$comment": "top level schema document rules", "type": "object", @@ -51,21 +54,20 @@ "$comment": "This is optional, but if provided, should match the name of this file.", "pattern": "/schemas/schedulers/scheduler-schema.json$" }, - "description": { "description": "Optional description of the config.", "type": "string" }, - "class": { "description": "The name of the scheduler class to use.", "$comment": "required", "enum": [ "mlos_bench.schedulers.SyncScheduler", - "mlos_bench.schedulers.sync_scheduler.SyncScheduler" + "mlos_bench.schedulers.sync_scheduler.SyncScheduler", + "mlos_bench.schedulers.ParallelScheduler", + "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler" ] }, - "config": { "description": "The scheduler-specific config.", "$comment": "Stub for scheduler-specific config appended with condition statements below", @@ -73,8 +75,9 @@ "minProperties": 1 } }, - "required": ["class"], - + "required": [ + "class" + ], "oneOf": [ { "$comment": "extensions to the 'config' object properties when synchronous scheduler is being used", @@ -83,17 +86,25 @@ "class": { "enum": [ "mlos_bench.schedulers.SyncScheduler", - "mlos_bench.schedulers.sync_scheduler.SyncScheduler" + "mlos_bench.schedulers.sync_scheduler.SyncScheduler", + "mlos_bench.schedulers.ParallelScheduler", + "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler" ] } }, - "required": ["class"] + "required": [ + "class" + ] }, "then": { "properties": { "config": { "type": "object", - "allOf": [{ "$ref": "#/$defs/config_base_scheduler" }], + "allOf": [ + { + "$ref": "#/$defs/config_base_scheduler" + } + ], "$comment": "disallow other properties", "unevaluatedProperties": false } diff --git a/mlos_bench/mlos_bench/schedulers/__init__.py b/mlos_bench/mlos_bench/schedulers/__init__.py index 381261e53da..fd381612be7 100644 --- a/mlos_bench/mlos_bench/schedulers/__init__.py +++ b/mlos_bench/mlos_bench/schedulers/__init__.py @@ -5,9 +5,11 @@ """Interfaces and implementations of the optimization loop scheduling policies.""" from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.schedulers.parallel_scheduler import ParallelScheduler from mlos_bench.schedulers.sync_scheduler import SyncScheduler __all__ = [ "Scheduler", "SyncScheduler", + "ParallelScheduler", ] diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py new file mode 100644 index 00000000000..5393cb1e933 --- /dev/null +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -0,0 +1,145 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +"""A simple single-threaded synchronous optimization loop implementation.""" + +import copy +from datetime import datetime +import logging +import threading +from typing import Callable, Optional + +from pytz import UTC + +import logging +from datetime import datetime + +from pytz import UTC + +from mlos_bench.storage.base_storage import Storage + +from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.storage.base_storage import Storage +import time + +_LOG = logging.getLogger(__name__) + +class ParallelScheduler(Scheduler): + """A simple multi-threaded asynchronous optimization loop implementation.""" + def start(self) -> None: + """Start the optimization loop.""" + super().start() + + self._idle_runners: set[int] = set(self._trial_runners.keys()) + self._busy_runners: set[int] = set() + self._scheduled_trials: set[int] = set() + self._pending_updates: list[Callable[[], None]] = [] + self._pending_threads: list[threading.Thread] = [] + self._runner_lock: threading.Lock = threading.Lock() + + is_warm_up: bool = self.optimizer.supports_preload + if not is_warm_up: + _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) + + not_done: bool = True + while not_done: + _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) + self._run_schedule(is_warm_up) + not_done = self._schedule_new_optimizer_suggestions() + is_warm_up = False + + # Wait for all pending runners to finish + while len(self._busy_runners) > 0: + with self._runner_lock: + for update in self._pending_updates: + update() + time.sleep(1) + + def _run_schedule(self, running: bool = False) -> None: + """ + Scheduler part of the loop. + + Check for pending trials in the queue and run them. + """ + assert self.experiment is not None + # Collect all pending trials + # It is critical that we filter out trials that are already assigned to a trial runner + # If we do not filter out these trials, it will cause configurations to be double scheduled + # and will cause the storage backend to fail. + pending_trials: list[Storage.Trial] = [ + t + for t in self.experiment.pending_trials(datetime.now(UTC), running=running) + if t.trial_id not in self._scheduled_trials + ] + + for trial in pending_trials: + # Wait for an idle trial runner + trial_runner_id: Optional[int] = None + while trial_runner_id is None: + with self._runner_lock: + for update in self._pending_updates: + update() + self._pending_updates.clear() + if len(self._idle_runners) > 0: + # Schedule a Trial to a Trial Runner + trial_runner_id = self._idle_runners.pop() + self._busy_runners.add(trial_runner_id) + self._scheduled_trials.add(trial.trial_id) + + # Assign the trial to the trial runner. Note that this will be reset + # if pending_trials is queried again from the experiment + trial.set_trial_runner(trial_runner_id) + self.run_trial(trial) + + if trial_runner_id is None: + # Sleep for a short time if failed to find to prevent busy wait + _LOG.debug("No idle trial runners available. Waiting...") + time.sleep(1) + + def async_run_trial(self, trial: Storage.Trial) -> None: + """ + Run a single trial in the background. + + Parameters + ---------- + trial : Storage.Trial + A Storage class based Trial used to persist the experiment trial data. + """ + register_fn = self.get_trial_runner(trial)._run_trial( + trial, copy.copy(self.global_config) + ) + + def callback(trial: Storage.Trial = trial): + """ + Callback to pass to the main thread to register the results with the storage. + + Parameters + ---------- + trial : Storage.Trial + A Storage class based Trial used to persist the experiment trial data. + """ + assert trial.trial_runner_id is not None, "Trial runner ID should not be None" + + register_fn() + self._busy_runners.remove(trial.trial_runner_id) + self._idle_runners.add(trial.trial_runner_id) + + with self._runner_lock: + self._pending_updates.append(callback) + + def run_trial(self, trial: Storage.Trial) -> None: + """ + Schedule a single trial to be run in the background. + + Parameters + ---------- + trial : Storage.Trial + A Storage class based Trial used to persist the experiment trial data. + """ + super().run_trial(trial) + trial_runner = self.get_trial_runner(trial) + + thread = threading.Thread(target=self.async_run_trial, args=(trial,)) + self._pending_threads.append(thread) + thread.start() diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 80eb696bc6d..6c39279d434 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -5,6 +5,7 @@ """Simple class to run an individual Trial on a given Environment.""" import logging +from collections.abc import Callable from datetime import datetime from types import TracebackType from typing import Any, Literal @@ -164,14 +165,14 @@ def is_running(self) -> bool: """Get the running state of the current TrialRunner.""" return self._is_running - def run_trial( + def _run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ) -> None: + ) -> Callable[[], None]: """ - Run a single trial on this TrialRunner's Environment and stores the results in - the backend Trial Storage. + Run a single trial on this TrialRunner's Environment and return a callback to + store the results in the backend Trial Storage. Parameters ---------- @@ -182,8 +183,8 @@ def run_trial( Returns ------- - (trial_status, trial_score) : (Status, dict[str, float] | None) - Status and results of the trial. + callback : Callable[[], None] + Returns a callback to register the results with the storage backend. """ assert self._in_context @@ -199,8 +200,18 @@ def run_trial( _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) # FIXME: Use the actual timestamp from the environment. _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED) - trial.update(Status.FAILED, datetime.now(UTC)) - return + + def fail_callback() -> None: + """ + A callback to register the results with the storage backend. + + This must be called from the main thread. For a synchronous scheduler + this can just be called directly. For an asynchronous scheduler, this + will be passed to the main thread when the trial is finished. + """ + trial.update(Status.FAILED, datetime.now(UTC)) + + return fail_callback # TODO: start background status polling of the environments in the event loop. @@ -214,13 +225,63 @@ def run_trial( # Use the status and timestamp from `.run()` as it is the final status of the experiment. # TODO: Use the `.status()` output in async mode. - trial.update_telemetry(status, timestamp, telemetry) - - trial.update(status, timestamp, results) - _LOG.info("TrialRunner: Update trial results: %s :: %s %s", trial, status, results) + def success_callback() -> None: + """ + A callback to register the results with the storage backend. + + This must be called from the main thread. For a synchronous scheduler this + can just be called directly. For an asynchronous scheduler, this will be + passed to the main thread when the trial is finished. + """ + trial.update_telemetry(status, timestamp, telemetry) + trial.update(status, timestamp, results) + _LOG.info("TrialRunner: Update trial results: %s :: %s %s", trial, status, results) self._is_running = False + return success_callback + + def run_trial( + self, + trial: Storage.Trial, + global_config: dict[str, Any] | None = None, + ) -> None: + """ + Run a single trial on this TrialRunner's Environment and store the results in + the backend Trial Storage. + + Parameters + ---------- + trial : Storage.Trial + A Storage class based Trial used to persist the experiment trial data. + global_config : dict + Global configuration parameters. + """ + self._run_trial(trial, global_config)() + + async def async_run_trial( + self, + trial: Storage.Trial, + global_config: dict[str, Any] | None = None, + ) -> Callable[[], None]: + """ + Run a single trial on this TrialRunner's Environment and return a callback to + store the results in the backend Trial Storage. + + Parameters + ---------- + trial : Storage.Trial + A Storage class based Trial used to persist the experiment trial data. + global_config : dict + Global configuration parameters. + + Returns + ------- + callback : Callable[[], None] + Returns a callback to register the results with the storage backend. + """ + return self._run_trial(trial, global_config) + def teardown(self) -> None: """ Tear down the Environment. diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc new file mode 100644 index 00000000000..4ea6bdbf170 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-bad-repeat.jsonc @@ -0,0 +1,6 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + "trial_config_repeat_count": 0 + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc new file mode 100644 index 00000000000..06729a4f368 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/invalid/parallel_sched-empty-config.jsonc @@ -0,0 +1,5 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc new file mode 100644 index 00000000000..68623ee611f --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/bad/unhandled/parallel_sched-extra.jsonc @@ -0,0 +1,6 @@ +{ + "class": "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler", + "config": { + "extra": "unsupported" + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc new file mode 100644 index 00000000000..90bac645032 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/full/parallel_sched-full.jsonc @@ -0,0 +1,12 @@ +{ + "$schema": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/schedulers/scheduler-schema.json", + "class": "mlos_bench.schedulers.parallel_scheduler.ParallelScheduler", + "config": { + "trial_config_repeat_count": 3, + "teardown": false, + "experiment_id": "MyExperimentName", + "config_id": 1, + "trial_id": 1, + "max_trials": 100 + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc new file mode 100644 index 00000000000..1b0e39c3305 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/schedulers/test-cases/good/partial/parallel_sched-partial.jsonc @@ -0,0 +1,7 @@ +{ + "class": "mlos_bench.schedulers.ParallelScheduler", + "config": { + "trial_config_repeat_count": 3, + "teardown": false + } +} diff --git a/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py b/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py index 6294ee8bf3b..b84245732f5 100644 --- a/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py +++ b/mlos_bench/mlos_bench/tests/launcher_parse_args_test.py @@ -18,7 +18,7 @@ from mlos_bench.launcher import Launcher from mlos_bench.optimizers import MlosCoreOptimizer, OneShotOptimizer from mlos_bench.os_environ import environ -from mlos_bench.schedulers import SyncScheduler +from mlos_bench.schedulers import ParallelScheduler, SyncScheduler from mlos_bench.services.types import ( SupportsAuth, SupportsConfigLoading, @@ -307,5 +307,67 @@ def test_launcher_args_parse_3(config_paths: list[str]) -> None: assert launcher.scheduler.trial_config_repeat_count == 2 +def test_launcher_args_parse_4(config_paths: list[str]) -> None: + """ + Test that using multiple --globals arguments works and that multiple space separated + options to --config-paths works. + + Check $var expansion and Environment loading. + """ + # Here we have multiple paths following --config-paths and --service. + cli_args = ( + "--config-paths " + + " ".join(config_paths) + + " --num-trial-runners 5" + + " --service services/remote/mock/mock_auth_service.jsonc" + " services/remote/mock/mock_remote_exec_service.jsonc" + " --scheduler schedulers/parallel_scheduler.jsonc" + f" --environment {ENV_CONF_PATH}" + " --globals globals/global_test_config.jsonc" + " --globals globals/global_test_extra_config.jsonc" + " --test_global_value_2 from-args" + ) + launcher = _get_launcher(__name__, cli_args) + # Check some additional features of the the parent service + assert isinstance(launcher.service, SupportsAuth) # from --service + assert isinstance(launcher.service, SupportsRemoteExec) # from --service + # Check that the first --globals file is loaded and $var expansion is handled. + assert launcher.global_config["experiment_id"] == "MockExperiment" + assert launcher.global_config["testVmName"] == "MockExperiment-vm" + # Check that secondary expansion also works. + assert launcher.global_config["testVnetName"] == "MockExperiment-vm-vnet" + # Check that the second --globals file is loaded. + assert launcher.global_config["test_global_value"] == "from-file" + # Check overriding values in a file from the command line. + assert launcher.global_config["test_global_value_2"] == "from-args" + # Check that we can expand a $var in a config file that references an environment variable. + assert path_join(launcher.global_config["pathVarWithEnvVarRef"], abs_path=True) == path_join( + os.getcwd(), "foo", abs_path=True + ) + assert launcher.global_config["varWithEnvVarRef"] == f"user:{getuser()}" + assert launcher.teardown + # Make sure we have the right number of trial runners. + assert len(launcher.trial_runners) == 5 # from cli args + # Check that the environment that got loaded looks to be of the right type. + env_config = launcher.config_loader.load_config(ENV_CONF_PATH, ConfigSchema.ENVIRONMENT) + assert env_config["class"] == "mlos_bench.environments.mock_env.MockEnv" + # All TrialRunners should get the same Environment. + assert all( + check_class_name(trial_runner.environment, env_config["class"]) + for trial_runner in launcher.trial_runners + ) + # Check that the optimizer looks right. + assert isinstance(launcher.optimizer, OneShotOptimizer) + # Check that the optimizer got initialized with defaults. + assert launcher.optimizer.tunable_params.is_defaults() + assert launcher.optimizer.max_suggestions == 1 # value for OneShotOptimizer + # Check that we pick up the right scheduler config: + assert isinstance(launcher.scheduler, ParallelScheduler) + assert ( + launcher.scheduler.trial_config_repeat_count == 3 + ) # from the custom sync_scheduler.jsonc config + assert launcher.scheduler.max_trials == -1 + + if __name__ == "__main__": pytest.main([__file__, "-n0"]) diff --git a/mlos_bench/mlos_bench/tests/storage/conftest.py b/mlos_bench/mlos_bench/tests/storage/conftest.py index a1437052823..212bf4acd4c 100644 --- a/mlos_bench/mlos_bench/tests/storage/conftest.py +++ b/mlos_bench/mlos_bench/tests/storage/conftest.py @@ -16,5 +16,7 @@ exp_no_tunables_storage = sql_storage_fixtures.exp_no_tunables_storage mixed_numerics_exp_storage = sql_storage_fixtures.mixed_numerics_exp_storage exp_data = sql_storage_fixtures.exp_data +parallel_exp_data = sql_storage_fixtures.parallel_exp_data + exp_no_tunables_data = sql_storage_fixtures.exp_no_tunables_data mixed_numerics_exp_data = sql_storage_fixtures.mixed_numerics_exp_data diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index cb83bffd4ff..3cec974fcf5 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -4,12 +4,13 @@ # """Test fixtures for mlos_bench storage.""" -from collections.abc import Generator +from collections.abc import Callable, Generator from random import seed as rand_seed import pytest from mlos_bench.optimizers.mock_optimizer import MockOptimizer +from mlos_bench.schedulers.parallel_scheduler import ParallelScheduler from mlos_bench.schedulers.sync_scheduler import SyncScheduler from mlos_bench.schedulers.trial_runner import TrialRunner from mlos_bench.services.config_persistence import ConfigPersistenceService @@ -109,6 +110,94 @@ def mixed_numerics_exp_storage( assert not exp._in_context +def _parallel_dummy_run_exp( + storage: SqlStorage, + exp: SqlStorage.Experiment, +) -> ExperimentData: + """ + Generates data by doing a simulated run of the given experiment. + + Parameters + ---------- + storage : SqlStorage + The storage object to use. + exp : SqlStorage.Experiment + The experiment to "run". + Note: this particular object won't be updated, but a new one will be created + from its metadata. + + Returns + ------- + ExperimentData + The data generated by the simulated run. + """ + # pylint: disable=too-many-locals + + rand_seed(SEED) + + trial_runners: list[TrialRunner] = [] + global_config: dict = {} + config_loader = ConfigPersistenceService() + tunable_params = ",".join(f'"{name}"' for name in exp.tunables.get_covariant_group_names()) + mock_env_json = f""" + {{ + "class": "mlos_bench.environments.mock_env.MockEnv", + "name": "Test Env", + "config": {{ + "tunable_params": [{tunable_params}], + "mock_env_seed": {SEED}, + "mock_env_range": [60, 120], + "mock_env_metrics": ["score"] + }} + }} + """ + trial_runners = TrialRunner.create_from_json( + config_loader=config_loader, + global_config=global_config, + tunable_groups=exp.tunables, + env_json=mock_env_json, + svcs_json=None, + num_trial_runners=TRIAL_RUNNER_COUNT, + ) + + opt = MockOptimizer( + tunables=exp.tunables, + config={ + "optimization_targets": exp.opt_targets, + "seed": SEED, + # This should be the default, so we leave it omitted for now to test the default. + # But the test logic relies on this (e.g., trial 1 is config 1 is the + # default values for the tunable params) + # "start_with_defaults": True, + "max_suggestions": MAX_TRIALS, + }, + global_config=global_config, + ) + + scheduler = ParallelScheduler( + # All config values can be overridden from global config + config={ + "experiment_id": exp.experiment_id, + "trial_id": exp.trial_id, + "config_id": -1, + "trial_config_repeat_count": CONFIG_TRIAL_REPEAT_COUNT, + "max_trials": MAX_TRIALS, + }, + global_config=global_config, + trial_runners=trial_runners, + optimizer=opt, + storage=storage, + root_env_config=exp.root_env_config, + ) + + # Add some trial data to that experiment by "running" it. + with scheduler: + scheduler.start() + scheduler.teardown() + + return storage.experiments[exp.experiment_id] + + def _dummy_run_exp( storage: SqlStorage, exp: SqlStorage.Experiment, @@ -197,13 +286,49 @@ def _dummy_run_exp( return storage.experiments[exp.experiment_id] +def _exp_data( + storage: SqlStorage, + exp_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData.""" + return run_exp(storage, exp_storage) + + +def _exp_no_tunables_data( + storage: SqlStorage, + exp_no_tunables_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData with no tunable configs.""" + return run_exp(storage, exp_no_tunables_storage) + + +def _mixed_numerics_exp_data( + storage: SqlStorage, + mixed_numerics_exp_storage: SqlStorage.Experiment, + run_exp: Callable[[SqlStorage, SqlStorage.Experiment], ExperimentData] = _dummy_run_exp, +) -> ExperimentData: + """Test fixture for ExperimentData with mixed numerical tunable types.""" + return run_exp(storage, mixed_numerics_exp_storage) + + @pytest.fixture def exp_data( storage: SqlStorage, exp_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData.""" - return _dummy_run_exp(storage, exp_storage) + return _exp_data(storage, exp_storage) + + +@pytest.fixture +def parallel_exp_data( + storage: SqlStorage, + exp_storage: SqlStorage.Experiment, +) -> ExperimentData: + """Test fixture for ExperimentData with parallel scheduling.""" + return _exp_data(storage, exp_storage, run_exp=_parallel_dummy_run_exp) @pytest.fixture @@ -212,7 +337,7 @@ def exp_no_tunables_data( exp_no_tunables_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData with no tunable configs.""" - return _dummy_run_exp(storage, exp_no_tunables_storage) + return _exp_no_tunables_data(storage, exp_no_tunables_storage) @pytest.fixture @@ -221,4 +346,4 @@ def mixed_numerics_exp_data( mixed_numerics_exp_storage: SqlStorage.Experiment, ) -> ExperimentData: """Test fixture for ExperimentData with mixed numerical tunable types.""" - return _dummy_run_exp(storage, mixed_numerics_exp_storage) + return _mixed_numerics_exp_data(storage, mixed_numerics_exp_storage) diff --git a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py index a1ab74f9f5c..98d244160fc 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_schedule_test.py @@ -3,14 +3,17 @@ # Licensed under the MIT License. # """Unit tests for scheduling trials for some future time.""" -from collections.abc import Iterator +from collections.abc import Callable, Iterator from datetime import datetime, timedelta +from typing import Any +import numpy as np from pytz import UTC from mlos_bench.environments.status import Status from mlos_bench.storage.base_experiment_data import ExperimentData from mlos_bench.storage.base_storage import Storage +from mlos_bench.storage.base_trial_data import TrialData from mlos_bench.tests.storage import ( CONFIG_COUNT, CONFIG_TRIAL_REPEAT_COUNT, @@ -156,3 +159,29 @@ def test_rr_scheduling(exp_data: ExperimentData) -> None: assert ( trial.trial_runner_id == expected_runner_id ), f"Expected trial_runner_id {expected_runner_id} for {trial}" + + +def test_parallel_scheduling(parallel_exp_data: ExperimentData) -> None: + """ + Checks that the scheduler schedules all of Trials across TrialRunners. + + Note that we can no longer assume the order of the trials, since they can complete + in any order. + """ + extractor: Callable[[Callable[[TrialData], Any]], list[Any]] = lambda fn: [ + fn(parallel_exp_data.trials[id]) + for id in range(1, CONFIG_COUNT * CONFIG_TRIAL_REPEAT_COUNT + 1) + ] + + trial_ids = extractor(lambda trial: trial.trial_id) + assert set(trial_ids) == set(range(1, CONFIG_COUNT * CONFIG_TRIAL_REPEAT_COUNT + 1)) + + config_ids = extractor(lambda trial: trial.tunable_config_id) + unique_config_ids, config_counts = np.unique(config_ids, return_counts=True) + assert len(unique_config_ids) == CONFIG_COUNT + assert all(count == CONFIG_TRIAL_REPEAT_COUNT for count in config_counts) + + repeat_nums = extractor(lambda trial: trial.metadata_dict["repeat_i"]) + unique_repeat_nums, repeat_nums_counts = np.unique(repeat_nums, return_counts=True) + assert len(unique_repeat_nums) == CONFIG_TRIAL_REPEAT_COUNT + assert all(count == CONFIG_COUNT for count in repeat_nums_counts) From 1373ff19d7ea3e3b239e0afacb69ed88a3840c6a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 23 Apr 2025 15:46:07 +0000 Subject: [PATCH 02/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../schedulers/parallel_scheduler.py | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 5393cb1e933..ecf9edb07e5 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -5,28 +5,24 @@ """A simple single-threaded synchronous optimization loop implementation.""" import copy -from datetime import datetime import logging import threading -from typing import Callable, Optional - -from pytz import UTC - -import logging +import time +from collections.abc import Callable from datetime import datetime +from typing import Optional from pytz import UTC -from mlos_bench.storage.base_storage import Storage - from mlos_bench.schedulers.base_scheduler import Scheduler from mlos_bench.storage.base_storage import Storage -import time _LOG = logging.getLogger(__name__) + class ParallelScheduler(Scheduler): """A simple multi-threaded asynchronous optimization loop implementation.""" + def start(self) -> None: """Start the optimization loop.""" super().start() @@ -75,7 +71,7 @@ def _run_schedule(self, running: bool = False) -> None: for trial in pending_trials: # Wait for an idle trial runner - trial_runner_id: Optional[int] = None + trial_runner_id: int | None = None while trial_runner_id is None: with self._runner_lock: for update in self._pending_updates: @@ -106,13 +102,12 @@ def async_run_trial(self, trial: Storage.Trial) -> None: trial : Storage.Trial A Storage class based Trial used to persist the experiment trial data. """ - register_fn = self.get_trial_runner(trial)._run_trial( - trial, copy.copy(self.global_config) - ) + register_fn = self.get_trial_runner(trial)._run_trial(trial, copy.copy(self.global_config)) def callback(trial: Storage.Trial = trial): """ - Callback to pass to the main thread to register the results with the storage. + Callback to pass to the main thread to register the results with the + storage. Parameters ---------- From b45c71eebbbfae6cc75748ba7679ca384bfe95e1 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Thu, 24 Apr 2025 14:03:26 -0500 Subject: [PATCH 03/14] alternative implementation for threads --- mlos_bench/mlos_bench/environments/status.py | 8 + .../schedulers/parallel_scheduler.py | 164 +++++++++--------- .../mlos_bench/schedulers/trial_runner.py | 2 +- mlos_bench/mlos_bench/storage/base_storage.py | 23 +++ .../mlos_bench/storage/sql/experiment.py | 17 +- 5 files changed, 124 insertions(+), 90 deletions(-) diff --git a/mlos_bench/mlos_bench/environments/status.py b/mlos_bench/mlos_bench/environments/status.py index ca35b3473da..066b659f154 100644 --- a/mlos_bench/mlos_bench/environments/status.py +++ b/mlos_bench/mlos_bench/environments/status.py @@ -18,6 +18,7 @@ class Status(enum.Enum): CANCELED = 5 FAILED = 6 TIMED_OUT = 7 + SCHEDULED = 8 def is_good(self) -> bool: """Check if the status of the benchmark/environment is good.""" @@ -26,6 +27,7 @@ def is_good(self) -> bool: Status.READY, Status.RUNNING, Status.SUCCEEDED, + Status.SCHEDULED, } def is_completed(self) -> bool: @@ -74,3 +76,9 @@ def is_timed_out(self) -> bool: TIMED_OUT. """ return self == Status.FAILED + + def is_scheduled(self) -> bool: + """Check if the status of the benchmark/environment Trial or Experiment is + SCHEDULED. + """ + return self == Status.SCHEDULED diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index ecf9edb07e5..2e56b068316 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -4,18 +4,19 @@ # """A simple single-threaded synchronous optimization loop implementation.""" -import copy +import asyncio import logging -import threading -import time from collections.abc import Callable +from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime -from typing import Optional +from typing import Any from pytz import UTC +from mlos_bench.environments.status import Status from mlos_bench.schedulers.base_scheduler import Scheduler from mlos_bench.storage.base_storage import Storage +from mlos_bench.tunables.tunable_groups import TunableGroups _LOG = logging.getLogger(__name__) @@ -23,17 +24,15 @@ class ParallelScheduler(Scheduler): """A simple multi-threaded asynchronous optimization loop implementation.""" + def __init__(self, *args: Any, **kwargs: Any) -> None: + + super().__init__(*args, **kwargs) + self.pool = ThreadPoolExecutor(max_workers=len(self._trial_runners)) + def start(self) -> None: """Start the optimization loop.""" super().start() - self._idle_runners: set[int] = set(self._trial_runners.keys()) - self._busy_runners: set[int] = set() - self._scheduled_trials: set[int] = set() - self._pending_updates: list[Callable[[], None]] = [] - self._pending_threads: list[threading.Thread] = [] - self._runner_lock: threading.Lock = threading.Lock() - is_warm_up: bool = self.optimizer.supports_preload if not is_warm_up: _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) @@ -41,16 +40,34 @@ def start(self) -> None: not_done: bool = True while not_done: _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) + self._run_callbacks() self._run_schedule(is_warm_up) not_done = self._schedule_new_optimizer_suggestions() is_warm_up = False - # Wait for all pending runners to finish - while len(self._busy_runners) > 0: - with self._runner_lock: - for update in self._pending_updates: - update() - time.sleep(1) + def teardown(self) -> None: + """Stop the optimization loop.""" + super().teardown() + self.pool.shutdown(wait=True) + self._run_callbacks() + + def schedule_trial(self, tunables: TunableGroups) -> None: + """Assign a trial to a trial runner.""" + assert self.experiment is not None + + super().schedule_trial(tunables) + + pending_trials: list[Storage.Trial] = list( + self.experiment.pending_trials(datetime.now(UTC), running=False) + ) + + idle_runner_ids = [ + id for id, runner in self.trial_runners.items() if not runner.is_running + ] + + for trial, runner_id in zip(pending_trials, idle_runner_ids): + trial.update(status=Status.SCHEDULED, timestamp=datetime.now(UTC)) + trial.set_trial_runner(runner_id) def _run_schedule(self, running: bool = False) -> None: """ @@ -59,82 +76,63 @@ def _run_schedule(self, running: bool = False) -> None: Check for pending trials in the queue and run them. """ assert self.experiment is not None - # Collect all pending trials - # It is critical that we filter out trials that are already assigned to a trial runner - # If we do not filter out these trials, it will cause configurations to be double scheduled - # and will cause the storage backend to fail. - pending_trials: list[Storage.Trial] = [ - t - for t in self.experiment.pending_trials(datetime.now(UTC), running=running) - if t.trial_id not in self._scheduled_trials - ] - for trial in pending_trials: - # Wait for an idle trial runner - trial_runner_id: int | None = None - while trial_runner_id is None: - with self._runner_lock: - for update in self._pending_updates: - update() - self._pending_updates.clear() - if len(self._idle_runners) > 0: - # Schedule a Trial to a Trial Runner - trial_runner_id = self._idle_runners.pop() - self._busy_runners.add(trial_runner_id) - self._scheduled_trials.add(trial.trial_id) - - # Assign the trial to the trial runner. Note that this will be reset - # if pending_trials is queried again from the experiment - trial.set_trial_runner(trial_runner_id) - self.run_trial(trial) - - if trial_runner_id is None: - # Sleep for a short time if failed to find to prevent busy wait - _LOG.debug("No idle trial runners available. Waiting...") - time.sleep(1) - - def async_run_trial(self, trial: Storage.Trial) -> None: - """ - Run a single trial in the background. + scheduled_trials: list[Storage.Trial] = list( + self.experiment.filter_trials_by_status(datetime.now(UTC), [Status.SCHEDULED]) + ) - Parameters - ---------- - trial : Storage.Trial - A Storage class based Trial used to persist the experiment trial data. - """ - register_fn = self.get_trial_runner(trial)._run_trial(trial, copy.copy(self.global_config)) - - def callback(trial: Storage.Trial = trial): - """ - Callback to pass to the main thread to register the results with the - storage. - - Parameters - ---------- - trial : Storage.Trial - A Storage class based Trial used to persist the experiment trial data. - """ - assert trial.trial_runner_id is not None, "Trial runner ID should not be None" + for trial in scheduled_trials: + trial.update(status=Status.READY, timestamp=datetime.now(UTC)) + task = self.pool.submit(self.async_run_trial, trial) + asyncio.get_event_loop().call_soon_threadsafe(self._on_trial_finished, task) - register_fn() - self._busy_runners.remove(trial.trial_runner_id) - self._idle_runners.add(trial.trial_runner_id) + @staticmethod + def _on_trial_finished(result: Future[Callable[[], None]]) -> None: + """ + Callback to be called when a trial is finished. - with self._runner_lock: - self._pending_updates.append(callback) + This must always be called from the main thread. Exceptions can also be handled + here + """ + try: + callback = result.result() + callback() + except Exception as exception: # pylint: disable=broad-except + _LOG.error("Trial failed: %s", exception) + + @staticmethod + def _run_callbacks() -> None: + """Run all pending callbacks in the main thread.""" + loop = asyncio.get_event_loop() + pending = asyncio.all_tasks(loop) + loop.run_until_complete(asyncio.gather(*pending)) def run_trial(self, trial: Storage.Trial) -> None: - """ - Schedule a single trial to be run in the background. + """Parallel Scheduler does not support run_trial. Use async_run_trial instead. Parameters ---------- trial : Storage.Trial - A Storage class based Trial used to persist the experiment trial data. + The trial to run. + + Raises + ------ + NotImplementedError + Error to indicate that this method is not supported in ParallelScheduler. + """ + raise NotImplementedError( + "ParallelScheduler does not support run_trial. Use async_run_trial instead." + ) + + def async_run_trial(self, trial: Storage.Trial) -> Callable[[], None]: + """ + Set up and run a single trial asynchronously. + + Returns a callback to save the results in the storage. """ super().run_trial(trial) + # In the sync scheduler we run each trial on its own TrialRunner in sequence. trial_runner = self.get_trial_runner(trial) - - thread = threading.Thread(target=self.async_run_trial, args=(trial,)) - self._pending_threads.append(thread) - thread.start() + result = trial_runner.deferred_run_trial(trial, self.global_config) + _LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner) + return result diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 6c39279d434..4dc5a558fb6 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -259,7 +259,7 @@ def run_trial( """ self._run_trial(trial, global_config)() - async def async_run_trial( + def deferred_run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index f2d393994f7..8587f87b3b6 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -307,6 +307,29 @@ def load( Trial ids, Tunable values, benchmark scores, and status of the trials. """ + @abstractmethod + def filter_trials_by_status( + self, + timestamp: datetime, + status: list[Status], + ) -> Iterator["Storage.Trial"]: + """ + Return an iterator over the pending trials that are scheduled to run on or + before the specified timestamp matching one of status listed. + + Parameters + ---------- + timestamp : datetime.datetime + The time in UTC to check for scheduled trials. + status : list[Status] + Status of the trials to filter in. + + Returns + ------- + trials : Iterator[Storage.Trial] + An iterator over the matching trials. + """ + @abstractmethod def pending_trials( self, diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb47de7d714..eb052f6d1fc 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -235,13 +235,11 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access ) - def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: + def filter_trials_by_status( + self, timestamp: datetime, status: list[Status] + ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) - if running: - pending_status = [Status.PENDING.name, Status.READY.name, Status.RUNNING.name] - else: - pending_status = [Status.PENDING.name] with self._engine.connect() as conn: cur_trials = conn.execute( self._schema.trial.select().where( @@ -251,7 +249,7 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor | (self._schema.trial.c.ts_start <= timestamp) ), self._schema.trial.c.ts_end.is_(None), - self._schema.trial.c.status.in_(pending_status), + self._schema.trial.c.status.in_([s.name for s in status]), ) ) for trial in cur_trials.fetchall(): @@ -281,6 +279,13 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor config=config, ) + def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Storage.Trial]: + if running: + pending_status = [Status.PENDING, Status.READY, Status.RUNNING] + else: + pending_status = [Status.PENDING] + return self.filter_trials_by_status(timestamp=timestamp, status=pending_status) + def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int: """ Get the config ID for the given tunables. From a326cdfb144a0cd3939910af8999fbae1cf621f3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 24 Apr 2025 19:04:08 +0000 Subject: [PATCH 04/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 2e56b068316..929f555f515 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -108,7 +108,8 @@ def _run_callbacks() -> None: loop.run_until_complete(asyncio.gather(*pending)) def run_trial(self, trial: Storage.Trial) -> None: - """Parallel Scheduler does not support run_trial. Use async_run_trial instead. + """ + Parallel Scheduler does not support run_trial. Use async_run_trial instead. Parameters ---------- From 8d346866179e973ec630a73a6ca4a533cba874cc Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Thu, 24 Apr 2025 14:05:33 -0500 Subject: [PATCH 05/14] add comments --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 2e56b068316..af1940ce08a 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -48,6 +48,8 @@ def start(self) -> None: def teardown(self) -> None: """Stop the optimization loop.""" super().teardown() + + # Shutdown the thread pool and wait for all tasks to finish self.pool.shutdown(wait=True) self._run_callbacks() @@ -65,6 +67,7 @@ def schedule_trial(self, tunables: TunableGroups) -> None: id for id, runner in self.trial_runners.items() if not runner.is_running ] + # Assign pending trials to idle runners for trial, runner_id in zip(pending_trials, idle_runner_ids): trial.update(status=Status.SCHEDULED, timestamp=datetime.now(UTC)) trial.set_trial_runner(runner_id) @@ -83,7 +86,9 @@ def _run_schedule(self, running: bool = False) -> None: for trial in scheduled_trials: trial.update(status=Status.READY, timestamp=datetime.now(UTC)) - task = self.pool.submit(self.async_run_trial, trial) + task = self.pool.submit(self.deferred_run_trial, trial) + + # This is required to ensure that the callback happens on the main thread asyncio.get_event_loop().call_soon_threadsafe(self._on_trial_finished, task) @staticmethod @@ -124,7 +129,7 @@ def run_trial(self, trial: Storage.Trial) -> None: "ParallelScheduler does not support run_trial. Use async_run_trial instead." ) - def async_run_trial(self, trial: Storage.Trial) -> Callable[[], None]: + def deferred_run_trial(self, trial: Storage.Trial) -> Callable[[], None]: """ Set up and run a single trial asynchronously. From 4bbb534c06b97c28110cde797f30675f1a08525e Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 17:26:31 -0500 Subject: [PATCH 06/14] switch from threads to processes --- .../schedulers/parallel_scheduler.py | 59 ++++++----- .../mlos_bench/schedulers/trial_runner.py | 99 ++++--------------- 2 files changed, 50 insertions(+), 108 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 31ca139f57e..03da415b8d0 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -2,12 +2,11 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # -"""A simple single-threaded synchronous optimization loop implementation.""" +"""A simple multi-threaded asynchronous optimization loop implementation.""" import asyncio import logging -from collections.abc import Callable -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import Future, ProcessPoolExecutor from datetime import datetime from typing import Any @@ -27,7 +26,7 @@ class ParallelScheduler(Scheduler): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - self.pool = ThreadPoolExecutor(max_workers=len(self._trial_runners)) + self.pool = ProcessPoolExecutor(max_workers=len(self._trial_runners)) def start(self) -> None: """Start the optimization loop.""" @@ -47,11 +46,10 @@ def start(self) -> None: def teardown(self) -> None: """Stop the optimization loop.""" - super().teardown() - # Shutdown the thread pool and wait for all tasks to finish self.pool.shutdown(wait=True) self._run_callbacks() + super().teardown() def schedule_trial(self, tunables: TunableGroups) -> None: """Assign a trial to a trial runner.""" @@ -86,24 +84,25 @@ def _run_schedule(self, running: bool = False) -> None: for trial in scheduled_trials: trial.update(status=Status.READY, timestamp=datetime.now(UTC)) - task = self.pool.submit(self.deferred_run_trial, trial) - - # This is required to ensure that the callback happens on the main thread - asyncio.get_event_loop().call_soon_threadsafe(self._on_trial_finished, task) - - @staticmethod - def _on_trial_finished(result: Future[Callable[[], None]]) -> None: - """ - Callback to be called when a trial is finished. - - This must always be called from the main thread. Exceptions can also be handled - here - """ - try: - callback = result.result() - callback() - except Exception as exception: # pylint: disable=broad-except - _LOG.error("Trial failed: %s", exception) + self.deferred_run_trial(trial) + + def _on_trial_finished_closure(self, trial: Storage.Trial): + def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: + """ + Callback to be called when a trial is finished. + + This must always be called from the main thread. Exceptions can also be handled + here + """ + try: + (status, timestamp, results, telemetry) = result.result() + self.get_trial_runner(trial)._finalize_run_trial( + trial, status, timestamp, results, telemetry + ) + except Exception as exception: # pylint: disable=broad-except + _LOG.error("Trial failed: %s", exception) + + return _on_trial_finished @staticmethod def _run_callbacks() -> None: @@ -130,7 +129,7 @@ def run_trial(self, trial: Storage.Trial) -> None: "ParallelScheduler does not support run_trial. Use async_run_trial instead." ) - def deferred_run_trial(self, trial: Storage.Trial) -> Callable[[], None]: + def deferred_run_trial(self, trial: Storage.Trial) -> None: """ Set up and run a single trial asynchronously. @@ -139,6 +138,12 @@ def deferred_run_trial(self, trial: Storage.Trial) -> Callable[[], None]: super().run_trial(trial) # In the sync scheduler we run each trial on its own TrialRunner in sequence. trial_runner = self.get_trial_runner(trial) - result = trial_runner.deferred_run_trial(trial, self.global_config) + trial_runner._prepare_run_trial(trial, self.global_config) + + task = self.pool.submit(trial_runner._execute_run_trial, trial_runner.environment) + # This is required to ensure that the callback happens on the main thread + asyncio.get_event_loop().call_soon_threadsafe( + self._on_trial_finished_closure(trial), self, task + ) + _LOG.info("QUEUE: Finished trial: %s on %s", trial, trial_runner) - return result diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 4dc5a558fb6..ed54ef10aec 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -5,7 +5,6 @@ """Simple class to run an individual Trial on a given Environment.""" import logging -from collections.abc import Callable from datetime import datetime from types import TracebackType from typing import Any, Literal @@ -14,7 +13,6 @@ from mlos_bench.environments.base_environment import Environment from mlos_bench.environments.status import Status -from mlos_bench.event_loop_context import EventLoopContext from mlos_bench.services.base_service import Service from mlos_bench.services.config_persistence import ConfigPersistenceService from mlos_bench.services.local.local_exec import LocalExecService @@ -118,7 +116,6 @@ def __init__(self, trial_runner_id: int, env: Environment) -> None: assert self._env.parameters["trial_runner_id"] == self._trial_runner_id self._in_context = False self._is_running = False - self._event_loop_context = EventLoopContext() def __repr__(self) -> str: return ( @@ -165,27 +162,11 @@ def is_running(self) -> bool: """Get the running state of the current TrialRunner.""" return self._is_running - def _run_trial( + def _prepare_run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ) -> Callable[[], None]: - """ - Run a single trial on this TrialRunner's Environment and return a callback to - store the results in the backend Trial Storage. - - Parameters - ---------- - trial : Storage.Trial - A Storage class based Trial used to persist the experiment trial data. - global_config : dict - Global configuration parameters. - - Returns - ------- - callback : Callable[[], None] - Returns a callback to register the results with the storage backend. - """ + ): assert self._in_context assert not self._is_running @@ -197,49 +178,26 @@ def _run_trial( ) if not self.environment.setup(trial.tunables, trial.config(global_config)): - _LOG.warning("Setup failed: %s :: %s", self.environment, trial.tunables) - # FIXME: Use the actual timestamp from the environment. - _LOG.info("TrialRunner: Update trial results: %s :: %s", trial, Status.FAILED) - - def fail_callback() -> None: - """ - A callback to register the results with the storage backend. - - This must be called from the main thread. For a synchronous scheduler - this can just be called directly. For an asynchronous scheduler, this - will be passed to the main thread when the trial is finished. - """ - trial.update(Status.FAILED, datetime.now(UTC)) - - return fail_callback - - # TODO: start background status polling of the environments in the event loop. + trial.update(Status.FAILED, datetime.now(UTC)) + @staticmethod + def _execute_run_trial( + environment: Environment, + ): # Block and wait for the final result. - (status, timestamp, results) = self.environment.run() - _LOG.info("TrialRunner Results: %s :: %s\n%s", trial.tunables, status, results) + (status, timestamp, results) = environment.run() # In async mode (TODO), poll the environment for status and telemetry # and update the storage with the intermediate results. - (_status, _timestamp, telemetry) = self.environment.status() - - # Use the status and timestamp from `.run()` as it is the final status of the experiment. - # TODO: Use the `.status()` output in async mode. - def success_callback() -> None: - """ - A callback to register the results with the storage backend. - - This must be called from the main thread. For a synchronous scheduler this - can just be called directly. For an asynchronous scheduler, this will be - passed to the main thread when the trial is finished. - """ - trial.update_telemetry(status, timestamp, telemetry) - trial.update(status, timestamp, results) - _LOG.info("TrialRunner: Update trial results: %s :: %s %s", trial, status, results) + (_status, _timestamp, telemetry) = environment.status() - self._is_running = False + return (status, timestamp, results, telemetry) - return success_callback + def _finalize_run_trial(self, trial, status, timestamp, results, telemetry): + trial.update_telemetry(status, timestamp, telemetry) + trial.update(status, timestamp, results) + _LOG.info("TrialRunner: Update trial results: %s :: %s %s", trial, status, results) + self._is_running = False def run_trial( self, @@ -257,30 +215,9 @@ def run_trial( global_config : dict Global configuration parameters. """ - self._run_trial(trial, global_config)() - - def deferred_run_trial( - self, - trial: Storage.Trial, - global_config: dict[str, Any] | None = None, - ) -> Callable[[], None]: - """ - Run a single trial on this TrialRunner's Environment and return a callback to - store the results in the backend Trial Storage. - - Parameters - ---------- - trial : Storage.Trial - A Storage class based Trial used to persist the experiment trial data. - global_config : dict - Global configuration parameters. - - Returns - ------- - callback : Callable[[], None] - Returns a callback to register the results with the storage backend. - """ - return self._run_trial(trial, global_config) + self._prepare_run_trial(trial, global_config) + (status, timestamp, results, telemetry) = self._execute_run_trial(self._env) + self._finalize_run_trial(trial, status, timestamp, results, telemetry) def teardown(self) -> None: """ From 123ecd3b7331fe177c4c59ae441d6445a8570938 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 25 Apr 2025 22:26:54 +0000 Subject: [PATCH 07/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 03da415b8d0..bc1b7372c2e 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -91,8 +91,8 @@ def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: """ Callback to be called when a trial is finished. - This must always be called from the main thread. Exceptions can also be handled - here + This must always be called from the main thread. Exceptions can also be + handled here """ try: (status, timestamp, results, telemetry) = result.result() From 3357fcfb2bc262c90315fbe1fdb1172a85186c34 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 17:30:18 -0500 Subject: [PATCH 08/14] Update mlos_bench/mlos_bench/schedulers/parallel_scheduler.py Co-authored-by: Brian Kroth --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index bc1b7372c2e..4be62c32347 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -21,7 +21,7 @@ class ParallelScheduler(Scheduler): - """A simple multi-threaded asynchronous optimization loop implementation.""" + """A simple multi-process asynchronous optimization loop implementation.""" def __init__(self, *args: Any, **kwargs: Any) -> None: From 7a656296c9603f1923f2ce57dde4f6a46fe1fa24 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 17:33:24 -0500 Subject: [PATCH 09/14] Update mlos_bench/mlos_bench/storage/sql/experiment.py Co-authored-by: Brian Kroth --- mlos_bench/mlos_bench/storage/sql/experiment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb052f6d1fc..06826245e1a 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -236,7 +236,7 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d ) def filter_trials_by_status( - self, timestamp: datetime, status: list[Status] + self, timestamp: datetime, status: list[Status], ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) From 2c9ecb8f63ab98419aa53c715d6e3135a88d7872 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 25 Apr 2025 22:33:43 +0000 Subject: [PATCH 10/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mlos_bench/mlos_bench/storage/sql/experiment.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index 06826245e1a..0581dd95897 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -236,7 +236,9 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d ) def filter_trials_by_status( - self, timestamp: datetime, status: list[Status], + self, + timestamp: datetime, + status: list[Status], ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) From 40c7f83feb3ecdc13a27013df9154cbbf2b64126 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 17:35:04 -0500 Subject: [PATCH 11/14] updates for comments --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 4 ++-- mlos_bench/mlos_bench/storage/base_storage.py | 2 +- mlos_bench/mlos_bench/storage/sql/experiment.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 03da415b8d0..bc1b7372c2e 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -91,8 +91,8 @@ def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: """ Callback to be called when a trial is finished. - This must always be called from the main thread. Exceptions can also be handled - here + This must always be called from the main thread. Exceptions can also be + handled here """ try: (status, timestamp, results, telemetry) = result.result() diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index 8587f87b3b6..5e3400661ab 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -311,7 +311,7 @@ def load( def filter_trials_by_status( self, timestamp: datetime, - status: list[Status], + statuses: list[Status], ) -> Iterator["Storage.Trial"]: """ Return an iterator over the pending trials that are scheduled to run on or diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index eb052f6d1fc..3bfd4c258c7 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -236,7 +236,7 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d ) def filter_trials_by_status( - self, timestamp: datetime, status: list[Status] + self, timestamp: datetime, statuses: list[Status] ) -> Iterator[Storage.Trial]: timestamp = utcify_timestamp(timestamp, origin="local") _LOG.info("Retrieve pending trials for: %s @ %s", self._experiment_id, timestamp) @@ -249,7 +249,7 @@ def filter_trials_by_status( | (self._schema.trial.c.ts_start <= timestamp) ), self._schema.trial.c.ts_end.is_(None), - self._schema.trial.c.status.in_([s.name for s in status]), + self._schema.trial.c.status.in_([s.name for s in statuses]), ) ) for trial in cur_trials.fetchall(): @@ -284,7 +284,7 @@ def pending_trials(self, timestamp: datetime, *, running: bool) -> Iterator[Stor pending_status = [Status.PENDING, Status.READY, Status.RUNNING] else: pending_status = [Status.PENDING] - return self.filter_trials_by_status(timestamp=timestamp, status=pending_status) + return self.filter_trials_by_status(timestamp=timestamp, statuses=pending_status) def _get_config_id(self, conn: Connection, tunables: TunableGroups) -> int: """ From ee9e7e0904270167a29c37af4c068c7c9fcf1ab1 Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 18:29:29 -0500 Subject: [PATCH 12/14] fix linting errors --- .../schedulers/parallel_scheduler.py | 19 ++++-- .../mlos_bench/schedulers/trial_runner.py | 60 ++++++++++++++++--- mlos_bench/mlos_bench/storage/base_storage.py | 4 +- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index 4be62c32347..e1b74ec7b74 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -8,12 +8,14 @@ import logging from concurrent.futures import Future, ProcessPoolExecutor from datetime import datetime +from collections.abc import Callable from typing import Any from pytz import UTC from mlos_bench.environments.status import Status from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.schedulers.trial_runner import TrialRunner from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups @@ -86,7 +88,16 @@ def _run_schedule(self, running: bool = False) -> None: trial.update(status=Status.READY, timestamp=datetime.now(UTC)) self.deferred_run_trial(trial) - def _on_trial_finished_closure(self, trial: Storage.Trial): + def _on_trial_finished_closure( + self, trial: Storage.Trial + ) -> Callable[["ParallelScheduler", Future], None]: + """Generate a closure to handle the callback for when a trial is finished. + + Parameters + ---------- + trial : Storage.Trial + The trial to finish. + """ def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: """ Callback to be called when a trial is finished. @@ -96,7 +107,7 @@ def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: """ try: (status, timestamp, results, telemetry) = result.result() - self.get_trial_runner(trial)._finalize_run_trial( + self.get_trial_runner(trial).finalize_run_trial( trial, status, timestamp, results, telemetry ) except Exception as exception: # pylint: disable=broad-except @@ -138,9 +149,9 @@ def deferred_run_trial(self, trial: Storage.Trial) -> None: super().run_trial(trial) # In the sync scheduler we run each trial on its own TrialRunner in sequence. trial_runner = self.get_trial_runner(trial) - trial_runner._prepare_run_trial(trial, self.global_config) + trial_runner.prepare_run_trial(trial, self.global_config) - task = self.pool.submit(trial_runner._execute_run_trial, trial_runner.environment) + task = self.pool.submit(TrialRunner.execute_run_trial, trial_runner.environment) # This is required to ensure that the callback happens on the main thread asyncio.get_event_loop().call_soon_threadsafe( self._on_trial_finished_closure(trial), self, task diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index ed54ef10aec..3ff8b2c195e 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -19,6 +19,7 @@ from mlos_bench.services.types import SupportsConfigLoading from mlos_bench.storage.base_storage import Storage from mlos_bench.tunables.tunable_groups import TunableGroups +from mlos_bench.tunables.tunable_types import TunableValue _LOG = logging.getLogger(__name__) @@ -162,11 +163,20 @@ def is_running(self) -> bool: """Get the running state of the current TrialRunner.""" return self._is_running - def _prepare_run_trial( + def prepare_run_trial( self, trial: Storage.Trial, global_config: dict[str, Any] | None = None, - ): + ) -> None: + """Prepare the trial runner for running a trial. + + Parameters + ---------- + trial : Storage.Trial + The trial to prepare. + global_config : dict[str, Any] | None, optional + Global configuration parameters, by default None + """ assert self._in_context assert not self._is_running @@ -181,9 +191,21 @@ def _prepare_run_trial( trial.update(Status.FAILED, datetime.now(UTC)) @staticmethod - def _execute_run_trial( + def execute_run_trial( environment: Environment, - ): + ) -> tuple[Status, datetime, dict[str, TunableValue] | None, list[tuple[datetime, str, Any]]]: + """Execute the trial run on the environment. + + Parameters + ---------- + environment : Environment + The environment to run the trial on. + + Returns + ------- + tuple[Status, datetime, Optional[dict[str, TunableValue]], list[tuple[datetime, str, Any]]] + The fill results of the trial run, including status, timestamp, results, and telemetry. + """ # Block and wait for the final result. (status, timestamp, results) = environment.run() @@ -193,7 +215,29 @@ def _execute_run_trial( return (status, timestamp, results, telemetry) - def _finalize_run_trial(self, trial, status, timestamp, results, telemetry): + def finalize_run_trial( # pylint: disable=too-many-arguments, too-many-positional-arguments + self, + trial: Storage.Trial, + status: Status, + timestamp: datetime, + results: dict[str, TunableValue] | None, + telemetry: list[tuple[datetime, str, Any]], + ) -> None: + """Finalize the trial run in the storage backend. + + Parameters + ---------- + trial : Storage.Trial + The trial to finalize. + status : Status + The status of the trial. + timestamp : datetime + The timestamp of the trial execution. + results : Optional[dict[str, TunableValue]] + The results of the trial + telemetry : list[tuple[datetime, str, Any]] + The telemetry data of the trial. + """ trial.update_telemetry(status, timestamp, telemetry) trial.update(status, timestamp, results) _LOG.info("TrialRunner: Update trial results: %s :: %s %s", trial, status, results) @@ -215,9 +259,9 @@ def run_trial( global_config : dict Global configuration parameters. """ - self._prepare_run_trial(trial, global_config) - (status, timestamp, results, telemetry) = self._execute_run_trial(self._env) - self._finalize_run_trial(trial, status, timestamp, results, telemetry) + self.prepare_run_trial(trial, global_config) + (status, timestamp, results, telemetry) = self.execute_run_trial(self._env) + self.finalize_run_trial(trial, status, timestamp, results, telemetry) def teardown(self) -> None: """ diff --git a/mlos_bench/mlos_bench/storage/base_storage.py b/mlos_bench/mlos_bench/storage/base_storage.py index 5e3400661ab..87d61a6723e 100644 --- a/mlos_bench/mlos_bench/storage/base_storage.py +++ b/mlos_bench/mlos_bench/storage/base_storage.py @@ -315,13 +315,13 @@ def filter_trials_by_status( ) -> Iterator["Storage.Trial"]: """ Return an iterator over the pending trials that are scheduled to run on or - before the specified timestamp matching one of status listed. + before the specified timestamp matching one of statuses listed. Parameters ---------- timestamp : datetime.datetime The time in UTC to check for scheduled trials. - status : list[Status] + statuses : list[Status] Status of the trials to filter in. Returns From d6199991a67f71e87428a0594e26da4a67fc218f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 25 Apr 2025 23:29:52 +0000 Subject: [PATCH 13/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- mlos_bench/mlos_bench/schedulers/parallel_scheduler.py | 6 ++++-- mlos_bench/mlos_bench/schedulers/trial_runner.py | 9 ++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py index e1b74ec7b74..2092d5f5bb8 100644 --- a/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/parallel_scheduler.py @@ -6,9 +6,9 @@ import asyncio import logging +from collections.abc import Callable from concurrent.futures import Future, ProcessPoolExecutor from datetime import datetime -from collections.abc import Callable from typing import Any from pytz import UTC @@ -91,13 +91,15 @@ def _run_schedule(self, running: bool = False) -> None: def _on_trial_finished_closure( self, trial: Storage.Trial ) -> Callable[["ParallelScheduler", Future], None]: - """Generate a closure to handle the callback for when a trial is finished. + """ + Generate a closure to handle the callback for when a trial is finished. Parameters ---------- trial : Storage.Trial The trial to finish. """ + def _on_trial_finished(self: ParallelScheduler, result: Future) -> None: """ Callback to be called when a trial is finished. diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 3ff8b2c195e..9bed1049f74 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -168,7 +168,8 @@ def prepare_run_trial( trial: Storage.Trial, global_config: dict[str, Any] | None = None, ) -> None: - """Prepare the trial runner for running a trial. + """ + Prepare the trial runner for running a trial. Parameters ---------- @@ -194,7 +195,8 @@ def prepare_run_trial( def execute_run_trial( environment: Environment, ) -> tuple[Status, datetime, dict[str, TunableValue] | None, list[tuple[datetime, str, Any]]]: - """Execute the trial run on the environment. + """ + Execute the trial run on the environment. Parameters ---------- @@ -223,7 +225,8 @@ def finalize_run_trial( # pylint: disable=too-many-arguments, too-many-position results: dict[str, TunableValue] | None, telemetry: list[tuple[datetime, str, Any]], ) -> None: - """Finalize the trial run in the storage backend. + """ + Finalize the trial run in the storage backend. Parameters ---------- From 3dfb7deade284616568525537771909ba9accabf Mon Sep 17 00:00:00 2001 From: Johannes Freischuetz Date: Fri, 25 Apr 2025 21:48:31 -0500 Subject: [PATCH 14/14] try to fix the docs --- .../mlos_bench/schedulers/trial_runner.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/mlos_bench/mlos_bench/schedulers/trial_runner.py b/mlos_bench/mlos_bench/schedulers/trial_runner.py index 3ff8b2c195e..dad132423d2 100644 --- a/mlos_bench/mlos_bench/schedulers/trial_runner.py +++ b/mlos_bench/mlos_bench/schedulers/trial_runner.py @@ -174,7 +174,7 @@ def prepare_run_trial( ---------- trial : Storage.Trial The trial to prepare. - global_config : dict[str, Any] | None, optional + global_config : dict[str, Any] | None Global configuration parameters, by default None """ assert self._in_context @@ -203,8 +203,13 @@ def execute_run_trial( Returns ------- - tuple[Status, datetime, Optional[dict[str, TunableValue]], list[tuple[datetime, str, Any]]] - The fill results of the trial run, including status, timestamp, results, and telemetry. + tuple[ + Status, + datetime.datetime, + dict[str, TunableValue] | None, + list[tuple[datetime.datetime, str, Any]] + ] + The full results of the trial run, including status, timestamp, results, and telemetry. """ # Block and wait for the final result. (status, timestamp, results) = environment.run() @@ -231,11 +236,11 @@ def finalize_run_trial( # pylint: disable=too-many-arguments, too-many-position The trial to finalize. status : Status The status of the trial. - timestamp : datetime + timestamp : datetime.datetime The timestamp of the trial execution. - results : Optional[dict[str, TunableValue]] + results : dict[str, TunableValue] | None, The results of the trial - telemetry : list[tuple[datetime, str, Any]] + telemetry : list[tuple[datetime.datetime, str, Any]] The telemetry data of the trial. """ trial.update_telemetry(status, timestamp, telemetry)