From cf20c93b0d13ec5ffec444265622f0318119797c Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Wed, 13 Oct 2021 17:33:48 +0200 Subject: [PATCH 1/6] Added parallel search logic --- __init__.py | 8 +++ src/__init__.py | 0 .../gaussian_process_search.py | 22 +++--- src/parallel_searcher.py | 68 +++++++++++++++++++ src/search_job_instance.py | 41 +++++++++++ utilities/__init__.py | 0 load_save.py => utilities/io.py | 0 7 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 src/__init__.py rename gaussian_process.py => src/gaussian_process_search.py (95%) create mode 100644 src/parallel_searcher.py create mode 100644 src/search_job_instance.py create mode 100644 utilities/__init__.py rename load_save.py => utilities/io.py (100%) diff --git a/__init__.py b/__init__.py index e69de29..f0a5d64 100644 --- a/__init__.py +++ b/__init__.py @@ -0,0 +1,8 @@ +import os +import sys + +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from src.gaussian_process_search import GaussianProcessSearch +from src.parallel_searcher import ParallelSearcher +from src.search_job_instance import SearchJobInstance diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gaussian_process.py b/src/gaussian_process_search.py similarity index 95% rename from gaussian_process.py rename to src/gaussian_process_search.py index 99f640f..eb77851 100644 --- a/gaussian_process.py +++ b/src/gaussian_process_search.py @@ -12,8 +12,8 @@ from skopt.plots import plot_objective, plot_evaluations from skopt import dump, load -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -import load_save +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import utilities.io # Session variables session_params = {} @@ -46,7 +46,7 @@ def __init__(self, search_space, fixed_space, evaluator, input_file=None, output self.solutions = [] if input_file is not None: try: - data_dict = load_save.load(data_file=input_file) + data_dict = utilities.io.load(data_file=input_file) self.x_values, self.y_values = self._extract_values(data_dict) except OSError as e: raise OSError('Cannot read input file. \n' + str(e)) @@ -162,10 +162,16 @@ def get_next_candidate(self, n_points): optimizer = Optimizer( dimensions=self.search_space, base_estimator='gp', - n_initial_points=len(self.x_values), - acq_func='EI' + # n_initial_points=len(self.x_values), + acq_func='EI', + n_jobs=-1 ) - optimizer.tell(self.x_values, y_values) # TODO Does this fit the values??? + # x_values = np.array(self.x_values) + x_values = self.x_values + y_values = y_values + print("x:", x_values) + print("y:", y_values) + optimizer.tell(x_values, y_values, fit=True) points = optimizer.ask(n_points=n_points) return self._to_dict_list(points) @@ -263,7 +269,7 @@ def save_values(self): """ data_dict = self._pack_values() - load_save.save(self.output_file, data_dict) + utilities.io.save(self.output_file, data_dict) @staticmethod def _to_key_value(values): @@ -319,4 +325,4 @@ def save_checkpoint(self, res): res_dict[dimension.name].append(point[i]) res_dict['value'] = y_values - load_save.save(self.output_file, res_dict) + utilities.io.save(self.output_file, res_dict) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py new file mode 100644 index 0000000..b00908c --- /dev/null +++ b/src/parallel_searcher.py @@ -0,0 +1,68 @@ + +import time + +class ParallelSearcher: + def __init__(self, optimizer, job_class): + """ Instantiate parallel searching + + Args: + optimizer (GaussianProcessSearch): Optimizer used to find next points to test + job_class (SearchJobInstance): Implementation of SearchJobInstance to manage jobs + """ + self.optimizer = optimizer + self.job_class = job_class + + def optimize(self, + n_calls=10, + n_random_starts=5, + noise=0.01, + n_parallel_jobs=1, + refresh_rate=1, + first_id=0, + verbose=True, + plot_results=False): + + # Instantiate all initial jobs + instances = [self.job_class(i) for i in range(first_id, first_id + n_parallel_jobs)] + + # Get all initial candidates + candidates = [] + if len(self.optimizer.x_values) == 0: # If first points, sample random + candidates = self.optimizer.get_random_candidate(n_parallel_jobs) + else: + candidates = self.optimizer.get_next_candidate(n_parallel_jobs) + + # Launch all instances + for i in range(n_parallel_jobs): + print(candidates[i]) + instances[i].launch(**candidates[i]) + + while(True): + time.sleep(refresh_rate) # refresh rate in seconds + for i in range(n_parallel_jobs): + instance = instances[i] + if instance.done(): + instance_params = instance.passed_args + instance_result = instance.get_result() + + # Display information + print("*****") + print("Finished job:", instance.id) + print("Instance_params:", instance_params) + print("Instance_result:", instance_result) + print("*****") + + # Add point-evaluation info to the optimizer + self.optimizer.add_point_value(instance_params, instance_result) + self.optimizer.save_values() + + # Instantiate new job instance + candidate = self.optimizer.get_next_candidate(1)[0] + instances[i] = self.job_class(instance.id + n_parallel_jobs) + instances[i].launch(**candidate) + + # Display information + print("*****") + print("Starting job:", instances[i]) + print("Instance_params:", candidate) + print("*****") \ No newline at end of file diff --git a/src/search_job_instance.py b/src/search_job_instance.py new file mode 100644 index 0000000..b7d43c3 --- /dev/null +++ b/src/search_job_instance.py @@ -0,0 +1,41 @@ +from abc import ABC, abstractmethod + +class SearchJobInstance(ABC): + """Abstract class linked to a single job. + It is used to manage + """ + def __init__(self, id): + self.id = id + + @abstractmethod + def launch(self, **kwargs) -> None: + """Execute command given the objective arguments + IMPORTANT: Must be non-blocking! + """ + self.passed_args = kwargs + raise NotImplementedError + + @abstractmethod + def get_result(self) -> float: + """Return final result of the optimization + """ + raise NotImplementedError + + @abstractmethod + def done(self) -> bool: + """True if job has finished, false otherwise + IMPORTANT: Must be non-blocking! + """ + raise NotImplementedError + + @abstractmethod + def kill(self) -> None: + """Finish job + """ + raise NotImplementedError + + @abstractmethod + def end(self) -> None: + """Run any task necessary when done + """ + raise NotImplementedError \ No newline at end of file diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/load_save.py b/utilities/io.py similarity index 100% rename from load_save.py rename to utilities/io.py From 81b53f1a19edeb9c9ad00d7be0cb049e13453205 Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Thu, 14 Oct 2021 11:53:50 +0200 Subject: [PATCH 2/6] Fixed bug --- src/parallel_searcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py index b00908c..9482a62 100644 --- a/src/parallel_searcher.py +++ b/src/parallel_searcher.py @@ -36,12 +36,14 @@ def optimize(self, for i in range(n_parallel_jobs): print(candidates[i]) instances[i].launch(**candidates[i]) + n_calls -= 1 - while(True): + while n_calls > 0: time.sleep(refresh_rate) # refresh rate in seconds for i in range(n_parallel_jobs): instance = instances[i] if instance.done(): + n_calls -= 1 instance_params = instance.passed_args instance_result = instance.get_result() From 11f94d695fa7306697e2d28d053a01b1b75ffd8c Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Thu, 14 Oct 2021 16:16:03 +0200 Subject: [PATCH 3/6] Added retrials when launching instances --- src/parallel_searcher.py | 13 ++++++++++++- src/search_job_instance.py | 5 ++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py index 9482a62..8261a10 100644 --- a/src/parallel_searcher.py +++ b/src/parallel_searcher.py @@ -12,6 +12,16 @@ def __init__(self, optimizer, job_class): self.optimizer = optimizer self.job_class = job_class + def __launch(self, instance, candidate, retrials=5): + launch_status = 1 + attempt = 0 + while launch_status != 0 and attempt < retrials: + launch_status = instance.launch(**candidate) + if launch_status != 0: + print("There was some error launching the instance. Retrying (" + str(attempt) + "/" + retrials + ")") + time.sleep(0.1) + attempt += 1 + def optimize(self, n_calls=10, n_random_starts=5, @@ -35,7 +45,7 @@ def optimize(self, # Launch all instances for i in range(n_parallel_jobs): print(candidates[i]) - instances[i].launch(**candidates[i]) + self.__launch(instance=instances[i], candidate=candidates[i]) n_calls -= 1 while n_calls > 0: @@ -62,6 +72,7 @@ def optimize(self, candidate = self.optimizer.get_next_candidate(1)[0] instances[i] = self.job_class(instance.id + n_parallel_jobs) instances[i].launch(**candidate) + self.__launch(instance=instances[i], candidate=candidate) # Display information print("*****") diff --git a/src/search_job_instance.py b/src/search_job_instance.py index b7d43c3..af7d883 100644 --- a/src/search_job_instance.py +++ b/src/search_job_instance.py @@ -8,9 +8,12 @@ def __init__(self, id): self.id = id @abstractmethod - def launch(self, **kwargs) -> None: + def launch(self, **kwargs) -> int: """Execute command given the objective arguments IMPORTANT: Must be non-blocking! + + Returns: + status(Int): 0 everything is ok, 1 there was some error """ self.passed_args = kwargs raise NotImplementedError From 1f8d5ba0c4719e2189795d1e1bd3df589f74f812 Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Thu, 14 Oct 2021 16:33:31 +0200 Subject: [PATCH 4/6] Fixed bug --- src/parallel_searcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py index 8261a10..dca74d6 100644 --- a/src/parallel_searcher.py +++ b/src/parallel_searcher.py @@ -71,7 +71,6 @@ def optimize(self, # Instantiate new job instance candidate = self.optimizer.get_next_candidate(1)[0] instances[i] = self.job_class(instance.id + n_parallel_jobs) - instances[i].launch(**candidate) self.__launch(instance=instances[i], candidate=candidate) # Display information From f552b20d8cbd9a4f3ad8381cdb311665291987d4 Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Fri, 15 Oct 2021 11:49:19 +0200 Subject: [PATCH 5/6] Fixed bug --- src/gaussian_process_search.py | 5 +++-- src/parallel_searcher.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/gaussian_process_search.py b/src/gaussian_process_search.py index eb77851..dc3ff1f 100644 --- a/src/gaussian_process_search.py +++ b/src/gaussian_process_search.py @@ -147,7 +147,7 @@ def add_point_value(self, point, value): self.x_values.append(p) self.y_values.append(value) - def get_next_candidate(self, n_points): + def get_next_candidate(self, n_points, n_initial_points=None): """Returns the next candidates for the skopt acquisition function Args: @@ -162,7 +162,8 @@ def get_next_candidate(self, n_points): optimizer = Optimizer( dimensions=self.search_space, base_estimator='gp', - # n_initial_points=len(self.x_values), + n_initial_points=n_initial_points, + initial_point_generator="random", acq_func='EI', n_jobs=-1 ) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py index dca74d6..2f3d71c 100644 --- a/src/parallel_searcher.py +++ b/src/parallel_searcher.py @@ -69,7 +69,7 @@ def optimize(self, self.optimizer.save_values() # Instantiate new job instance - candidate = self.optimizer.get_next_candidate(1)[0] + candidate = self.optimizer.get_next_candidate(1, n_random_starts)[0] instances[i] = self.job_class(instance.id + n_parallel_jobs) self.__launch(instance=instances[i], candidate=candidate) From c762aa0b8cc23a4766fb450ff028e17d068b08e0 Mon Sep 17 00:00:00 2001 From: OleguerCanal Date: Fri, 15 Oct 2021 11:56:01 +0200 Subject: [PATCH 6/6] Removed print --- src/gaussian_process_search.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/gaussian_process_search.py b/src/gaussian_process_search.py index dc3ff1f..613737c 100644 --- a/src/gaussian_process_search.py +++ b/src/gaussian_process_search.py @@ -170,8 +170,6 @@ def get_next_candidate(self, n_points, n_initial_points=None): # x_values = np.array(self.x_values) x_values = self.x_values y_values = y_values - print("x:", x_values) - print("y:", y_values) optimizer.tell(x_values, y_values, fit=True) points = optimizer.ask(n_points=n_points) return self._to_dict_list(points)