diff --git a/__init__.py b/__init__.py index e69de29..f0a5d64 100644 --- a/__init__.py +++ b/__init__.py @@ -0,0 +1,8 @@ +import os +import sys + +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from src.gaussian_process_search import GaussianProcessSearch +from src.parallel_searcher import ParallelSearcher +from src.search_job_instance import SearchJobInstance diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gaussian_process.py b/src/gaussian_process_search.py similarity index 95% rename from gaussian_process.py rename to src/gaussian_process_search.py index 99f640f..613737c 100644 --- a/gaussian_process.py +++ b/src/gaussian_process_search.py @@ -12,8 +12,8 @@ from skopt.plots import plot_objective, plot_evaluations from skopt import dump, load -sys.path.append(os.path.dirname(os.path.abspath(__file__))) -import load_save +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import utilities.io # Session variables session_params = {} @@ -46,7 +46,7 @@ def __init__(self, search_space, fixed_space, evaluator, input_file=None, output self.solutions = [] if input_file is not None: try: - data_dict = load_save.load(data_file=input_file) + data_dict = utilities.io.load(data_file=input_file) self.x_values, self.y_values = self._extract_values(data_dict) except OSError as e: raise OSError('Cannot read input file. \n' + str(e)) @@ -147,7 +147,7 @@ def add_point_value(self, point, value): self.x_values.append(p) self.y_values.append(value) - def get_next_candidate(self, n_points): + def get_next_candidate(self, n_points, n_initial_points=None): """Returns the next candidates for the skopt acquisition function Args: @@ -162,10 +162,15 @@ def get_next_candidate(self, n_points): optimizer = Optimizer( dimensions=self.search_space, base_estimator='gp', - n_initial_points=len(self.x_values), - acq_func='EI' + n_initial_points=n_initial_points, + initial_point_generator="random", + acq_func='EI', + n_jobs=-1 ) - optimizer.tell(self.x_values, y_values) # TODO Does this fit the values??? + # x_values = np.array(self.x_values) + x_values = self.x_values + y_values = y_values + optimizer.tell(x_values, y_values, fit=True) points = optimizer.ask(n_points=n_points) return self._to_dict_list(points) @@ -263,7 +268,7 @@ def save_values(self): """ data_dict = self._pack_values() - load_save.save(self.output_file, data_dict) + utilities.io.save(self.output_file, data_dict) @staticmethod def _to_key_value(values): @@ -319,4 +324,4 @@ def save_checkpoint(self, res): res_dict[dimension.name].append(point[i]) res_dict['value'] = y_values - load_save.save(self.output_file, res_dict) + utilities.io.save(self.output_file, res_dict) diff --git a/src/parallel_searcher.py b/src/parallel_searcher.py new file mode 100644 index 0000000..2f3d71c --- /dev/null +++ b/src/parallel_searcher.py @@ -0,0 +1,80 @@ + +import time + +class ParallelSearcher: + def __init__(self, optimizer, job_class): + """ Instantiate parallel searching + + Args: + optimizer (GaussianProcessSearch): Optimizer used to find next points to test + job_class (SearchJobInstance): Implementation of SearchJobInstance to manage jobs + """ + self.optimizer = optimizer + self.job_class = job_class + + def __launch(self, instance, candidate, retrials=5): + launch_status = 1 + attempt = 0 + while launch_status != 0 and attempt < retrials: + launch_status = instance.launch(**candidate) + if launch_status != 0: + print("There was some error launching the instance. Retrying (" + str(attempt) + "/" + retrials + ")") + time.sleep(0.1) + attempt += 1 + + def optimize(self, + n_calls=10, + n_random_starts=5, + noise=0.01, + n_parallel_jobs=1, + refresh_rate=1, + first_id=0, + verbose=True, + plot_results=False): + + # Instantiate all initial jobs + instances = [self.job_class(i) for i in range(first_id, first_id + n_parallel_jobs)] + + # Get all initial candidates + candidates = [] + if len(self.optimizer.x_values) == 0: # If first points, sample random + candidates = self.optimizer.get_random_candidate(n_parallel_jobs) + else: + candidates = self.optimizer.get_next_candidate(n_parallel_jobs) + + # Launch all instances + for i in range(n_parallel_jobs): + print(candidates[i]) + self.__launch(instance=instances[i], candidate=candidates[i]) + n_calls -= 1 + + while n_calls > 0: + time.sleep(refresh_rate) # refresh rate in seconds + for i in range(n_parallel_jobs): + instance = instances[i] + if instance.done(): + n_calls -= 1 + instance_params = instance.passed_args + instance_result = instance.get_result() + + # Display information + print("*****") + print("Finished job:", instance.id) + print("Instance_params:", instance_params) + print("Instance_result:", instance_result) + print("*****") + + # Add point-evaluation info to the optimizer + self.optimizer.add_point_value(instance_params, instance_result) + self.optimizer.save_values() + + # Instantiate new job instance + candidate = self.optimizer.get_next_candidate(1, n_random_starts)[0] + instances[i] = self.job_class(instance.id + n_parallel_jobs) + self.__launch(instance=instances[i], candidate=candidate) + + # Display information + print("*****") + print("Starting job:", instances[i]) + print("Instance_params:", candidate) + print("*****") \ No newline at end of file diff --git a/src/search_job_instance.py b/src/search_job_instance.py new file mode 100644 index 0000000..af7d883 --- /dev/null +++ b/src/search_job_instance.py @@ -0,0 +1,44 @@ +from abc import ABC, abstractmethod + +class SearchJobInstance(ABC): + """Abstract class linked to a single job. + It is used to manage + """ + def __init__(self, id): + self.id = id + + @abstractmethod + def launch(self, **kwargs) -> int: + """Execute command given the objective arguments + IMPORTANT: Must be non-blocking! + + Returns: + status(Int): 0 everything is ok, 1 there was some error + """ + self.passed_args = kwargs + raise NotImplementedError + + @abstractmethod + def get_result(self) -> float: + """Return final result of the optimization + """ + raise NotImplementedError + + @abstractmethod + def done(self) -> bool: + """True if job has finished, false otherwise + IMPORTANT: Must be non-blocking! + """ + raise NotImplementedError + + @abstractmethod + def kill(self) -> None: + """Finish job + """ + raise NotImplementedError + + @abstractmethod + def end(self) -> None: + """Run any task necessary when done + """ + raise NotImplementedError \ No newline at end of file diff --git a/utilities/__init__.py b/utilities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/load_save.py b/utilities/io.py similarity index 100% rename from load_save.py rename to utilities/io.py