diff --git a/docs/remote.md b/docs/remote.md new file mode 100644 index 0000000..c7239be --- /dev/null +++ b/docs/remote.md @@ -0,0 +1,31 @@ +```mermaid + +sequenceDiagram + [User]->>RemoteFileTrialStore: create() + RemoteFileTrialStore->>ExperiturServer: create() + ExperiturServer->>FileTrialStore: create() + FileTrialStore-->>ExperiturServer: trial_id: str + ExperiturServer-->>RemoteFileTrialStore: trial_id: str + RemoteFileTrialStore-->>[User]: trial_id: str + + [User]->>RemoteFileTrialStore: set_data(trial_id: str, trial_data: dict) + RemoteFileTrialStore->>ExperiturServer: set_data(trial_id: str, trial_data: dict) + ExperiturServer->>FileTrialStore: set_data(trial_id: str, trial_data: dict) + + [User]->>RemoteFileTrialStore: get_data(trial_id: str) + RemoteFileTrialStore->>ExperiturServer: get_data(trial_id: str) + ExperiturServer->>FileTrialStore: get_data(trial_id: str) + FileTrialStore-->>ExperiturServer: trial_data: dict + ExperiturServer-->>RemoteFileTrialStore: trial_data: dict + RemoteFileTrialStore-->>[User]: trial_data: dict + + [User]->>RemoteFileTrialStore: filter(...) + RemoteFileTrialStore->>ExperiturServer: filter(...) + ExperiturServer->>FileTrialStore: filter(...) + FileTrialStore->>FileTrialStore: __iter__() + FileTrialStore-->>ExperiturServer: trial_datas: List[dict] + ExperiturServer-->>RemoteFileTrialStore: trial_datas: List[dict] + RemoteFileTrialStore-->>[User]: trial_datas: List[dict] + + +``` \ No newline at end of file diff --git a/experiments/optimization.py b/experiments/optimization.py new file mode 100644 index 0000000..8d8b37e --- /dev/null +++ b/experiments/optimization.py @@ -0,0 +1,116 @@ +import math +import operator + +import click +import matplotlib.pyplot as plt +import pandas as pd +import skopt +from scipy.stats import uniform +from skopt.plots import plot_convergence, plot_objective +from skopt.space import Categorical, Dimension, Integer, Real +from skopt.utils import dimensions_aslist, point_asdict, point_aslist + +from experitur import Experiment, Trial +from experitur.parameters import Grid, Multi, SKOpt +from experitur.parameters.skopt import convert_trial + + +def rosenbrock(a, b, x, y): + return (a - x) ** 2 + b * (y - x ** 2) ** 2 + + +@Grid({"a": [1], "b": [100]}) +@SKOpt({"x": Real(-10, 10), "y": Real(-10, 10)}, "z", 200) +@Grid({"repetition": [1, 2, 3]}) +@Experiment(active=False,) +def exp(trial: Trial): + z = trial.call(rosenbrock) + + print(z) + + return {"z": z} + + +def hyper(x, y): + return x ** 2 + y ** 2 + + +@Experiment( + parameters=SKOpt({"x": Real(-10, 10), "y": Real(-10, 10)}, "z", 100), active=False +) +def exp2(trial: Trial): + z = trial.call(hyper) + return {"z": z} + + +def fun(x, y, z): + return x ** 2 + math.sin(y) + z + + +@Experiment( + parameters=SKOpt( + {"x": Real(-10, 10), "y": Real(-10, 10), "z": Real(-10, 10)}, "res", 100 + ) +) +def exp3(trial: Trial): + res = trial.call(fun) + return {"res": res} + + +def as_dimension(d): + if isinstance(d, Dimension): + return d + if isinstance(d, list): + return Categorical(d) + raise ValueError(f"Unexpected dimension: {d!r}") + + +@click.argument("objective") +@click.option("--plot-objective", "plot_objective_", is_flag=True) +@click.option("--plot-convergence", "plot_convergence_", is_flag=True) +@exp3.command(target="experiment") +def show( + experiment: Experiment, objective, plot_objective_=False, plot_convergence_=False +): + """ + Show information about optimization. + + Run `experitur do optimization.py show exp3 res` to show information about the optimization that takes place in `exp3` regarding objective `res`. + """ + + search_space = { + n: as_dimension(d) + for n, d in experiment.parameter_generator.varying_parameters.items() + } + + trials = experiment.ctx.store.match(experiment=experiment) + + results = [ + convert_trial(trial, search_space, objective) + for trial in sorted(trials.values(), key=lambda trial: trial.data["time_start"]) + ] + + if not results: + print("No results.") + return + + X, Y = zip(*results) + + optimizer = skopt.Optimizer(dimensions_aslist(search_space)) + + optimize_result = optimizer.tell(X, Y) + + print( + f"Current optimium {optimize_result.fun} at {point_asdict(search_space, optimize_result.x)}" + ) + + if plot_objective_: + plot_objective(optimize_result, levels=20) + plt.show() + elif plot_convergence_: + plot_convergence(optimize_result) + plt.show() + else: + print( + "Use --plot-convergence to plot the convergence trace or --plot-objective to show the pairwise dependence plot of the objective function." + ) diff --git a/experitur/core/context.py b/experitur/core/context.py index 28c5431..277be3e 100644 --- a/experitur/core/context.py +++ b/experitur/core/context.py @@ -87,11 +87,6 @@ def __init__( else: self.wdir = wdir - # Import here to break dependency cycle - from experitur.core.trial_store import FileTrialStore - - self.store = FileTrialStore(self) - # Configuration if config is None: self.config = self._default_config.copy() @@ -103,6 +98,10 @@ def __init__( def _register_experiment(self, experiment): self.registered_experiments.append(experiment) + def create_trial(self, trial_data, experiment: Experiment) -> TrialData: + trial_id = self.store.create() + return + def run(self, experiments=None): """ Run the specified experiments or all. diff --git a/experitur/core/experiment.py b/experitur/core/experiment.py index ea25004..47c2e20 100644 --- a/experitur/core/experiment.py +++ b/experitur/core/experiment.py @@ -265,7 +265,7 @@ def run(self): if self.ctx.config["skip_existing"]: # Check, if a trial with this parameter set already exists - existing = self.ctx.store.match( + existing = self.ctx.trials.filter( func=self.func, parameters=trial_configuration.get("parameters", {}), ) diff --git a/experitur/core/trial.py b/experitur/core/trial.py index da37d44..e3bd6cf 100644 --- a/experitur/core/trial.py +++ b/experitur/core/trial.py @@ -2,7 +2,7 @@ import inspect import itertools from collections import OrderedDict, defaultdict -from collections.abc import Collection +from collections.abc import Collection, Sequence from typing import ( TYPE_CHECKING, Any, @@ -10,16 +10,17 @@ Iterable, List, Mapping, + Optional, Tuple, TypeVar, Union, ) - from experitur.core.logger import YAMLLogger if TYPE_CHECKING: # pragma: no cover from experitur.core.experiment import Experiment + from experitur.core.trial_store import TrialStore T = TypeVar("T") @@ -382,7 +383,7 @@ def log(self, values, **kwargs): self._logger.log(values) -class TrialCollection(Collection): +class TrialCollectionBase(Sequence): _missing = object() def __init__(self, trials: List[Trial]): @@ -403,7 +404,7 @@ def pop(self, index=-1): @property def independent_parameters(self): independent_parameters = set() - for t in self.trials: + for t in self: independent_parameters.update( getattr(t, "experiment", {}).get("independent_parameters", []) ) @@ -414,7 +415,7 @@ def varying_parameters(self): """Independent parameters that vary in this trial collection.""" independent_parameters = self.independent_parameters parameter_values = defaultdict(set) - for t in self.trials: + for t in self: for p in independent_parameters: try: v = t[p] @@ -425,28 +426,71 @@ def varying_parameters(self): return set(p for p in independent_parameters if len(parameter_values[p]) > 1) - def to_pandas(self): - import pandas as pd + def to_pandas(self, failed=True): + try: + from pandas import json_normalize + except ImportError: + try: + from pandas.io.json import json_normalize + except ImportError: + raise RuntimeError("pandas is not available.") + + return json_normalize( + [t.data for t in self if failed or not t.is_failed], max_level=1 + ).set_index("id") + + @abstractmethod + def one(self): + pass - return pd.json_normalize([t.data for t in self.trials], max_level=1).set_index( - "id" - ) + def filter( + self, + filter_fn: Optional[Callable[[TrialData], bool]], + func=None, + parameters=None, + experiment=None, + ) -> "TrialCollection": + """ + Return a filtered version of this trial collection. + + Args: + func (callable): A function that receives a TrialData instance and returns True if the trial should be kept. + + Returns: + A new trial collection. + """ + + if func is not None: + raise NotImplementedError() + + if parameters is not None: + raise NotImplementedError() + + if experiment is not None: + raise NotImplementedError() + + return TrialCollection(list(filter(filter_fn, self))) def one(self): - if len(self.trials) != 1: + if len(self) != 1: raise ValueError("No individual trial.") - return self.trials[0] + return self[0] def filter(self, fn: Callable[[Trial], bool]) -> "TrialCollection": """ Return a filtered version of this trial collection. Args: - fn (callable): A function that receives a TrialData instance and returns True if the trial should be kept. + filter_fn (callable): A function that receives the trial data dictionary and returns True if the trial should be kept. Returns: A new trial collection. """ - return TrialCollection(list(filter(fn, self.trials))) + return TrialCollection( + self._store.match( + func=func, experiment=experiment, resolved_parameters=parameters + ) + ).filter(filter_fn) + diff --git a/experitur/core/trial_store.py b/experitur/core/trial_store.py index dae925c..fe47da2 100644 --- a/experitur/core/trial_store.py +++ b/experitur/core/trial_store.py @@ -47,9 +47,20 @@ class KeyExistsError(Exception): class TrialStore(collections.abc.MutableMapping): + _implementations: Dict[str, "TrialStore"] = {} + def __init__(self, ctx: "Context"): self.ctx = ctx + # Class methods for the registration of trial store implementations. + @classmethod + def _register_implementation(cls): + TrialStore._implementations[cls.__name__] = cls + + @staticmethod + def get_implementation(implementation_name) -> "TrialStore": + return TrialStore._implementations[implementation_name] + def match( self, func=None, parameters=None, experiment=None, resolved_parameters=None ) -> List[Dict]: @@ -173,6 +184,38 @@ def delete_all(self, keys): del self[k] +class MemoryTrialStore(TrialStore): + def __init__(self, ctx: "Context"): + super().__init__(ctx) + self.data = {} + self._lock = threading.Lock() + + def __len__(self): + return len(self.data) + + def __getitem__(self, key): + trial_data = self.data[key] + return TrialData(self, data=trial_data) + + def __iter__(self): + return iter(self.data) + + def __setitem__(self, key, trial_data: TrialData): + self.data[key] = trial_data.data + + def __delitem__(self, key): + del self.data[key] + + def lock(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + +MemoryTrialStore._register_implementation() # pylint: disable=protected-access + + class FileTrialStore(TrialStore): TRIAL_FN = "trial.yaml" DUMPER = ExperiturDumper @@ -257,3 +300,49 @@ def __delitem__(self, trial_id): raise KeyError shutil.rmtree(os.path.dirname(path)) + + def lock(self): + # TODO: Implement locking + pass + + def release(self): + # TODO: Implement locking + pass + + +# Make implementation known to the base class +FileTrialStore._register_implementation() # pylint: disable=protected-access + +try: + import zerorpc +except ImportError: + pass +else: + + class RemoteFileTrialStore(TrialStore): + def __init__(self, ctx: "Context"): + super().__init__(ctx) + + endpoint = self.ctx.config["remote_endpoint"] + self.client = zerorpc.Client(endpoint) + + def create(self, trial_configuration, experiment: "Experiment"): + return self.client.create_trial(trial_id) + + def __delitem__(self, trial_id): + self.client.del_trial(trial_id) + + def __getitem__(self, trial_id) -> TrialData: + return TrialData(self, self.client.get_trial_data(trial_id)) + + def __setitem__(self, trial_id, trial_data: TrialData): + return self.client.set_trial_data(trial_id, trial_data.data) + + def __iter__(self): + return iter(self.client.get_trial_ids()) + + def __len__(self): + return self.client.get_n_trials() + + # RemoteFileTrialStore._register_implementation() # pylint: disable=protected-access + diff --git a/experitur/server.py b/experitur/server.py new file mode 100644 index 0000000..4a6c348 --- /dev/null +++ b/experitur/server.py @@ -0,0 +1,20 @@ +from typing import TYPE_CHECKING + +import zerorpc + +from experitur.core.trial import TrialData + +if TYPE_CHECKING: + from experitur.core.context import Context + + +class ExperiturServer(zerorpc.Server): + def __init__(self, ctx: "Context"): + super().__init__() + self.ctx = ctx + + def get_trial_data(self, trial_id): + return self.ctx.store[trial_id] + + def set_trial_data(self, trial_id, trial_data): + self.ctx.store[trial_id] = TrialData(self.ctx.store, trial_data) diff --git a/setup.py b/setup.py index 2a82a5e..e445012 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ python_requires=">=3.6", extras_require={ "tests": ["pytest", "pytest-cov"], - "optional": ["scikit-optimize", "scikit-learn", "pandas"], + "optional": ["scikit-optimize", "scikit-learn", "pandas", "zerorpc"], "docs": [ "sphinx >= 1.4", "sphinx_rtd_theme", diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..003207a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture() +def random_ipc_endpoint(tmp_path): + return f"ipc://{tmp_path!s}/ipc" diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..36b82e5 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,47 @@ +from experitur.server import ExperiturServer +from experitur.core.context import Context + +try: + import gevent + import zerorpc +except ImportError: + pass +else: + + def test_server(tmp_path, random_ipc_endpoint): + config = {"remote_endpoint": random_ipc_endpoint} + with Context(str(tmp_path), config) as ctx: + server = ExperiturServer(ctx) + server.bind(random_ipc_endpoint) + gevent.spawn(server.run) + + client = zerorpc.Client(random_ipc_endpoint) + + client.set_trial_data("some_id", {"wdir": ".", "id": "foo"}) + + def test_zerorpc(random_ipc_endpoint): + class MySrv(zerorpc.Server): + def __init__(self): + super().__init__() + self.ctx = None + + def lock(self): + pass + + def release(self): + pass + + def get_trial_data(self, trial_id): + pass + + def set_trial_data(self, trial_id, trial_data): + pass + + srv = MySrv() + srv.bind(random_ipc_endpoint) + gevent.spawn(srv.run) + + client = zerorpc.Client() + client.connect(random_ipc_endpoint) + + client.set_trial_data("trial_id", {"wdir": ".", "id": "foo"}) diff --git a/tests/test_trial_store.py b/tests/test_trial_store.py index 4aff47a..1c7911a 100644 --- a/tests/test_trial_store.py +++ b/tests/test_trial_store.py @@ -26,15 +26,42 @@ def test__format_independent_parameters(): assert _format_trial_id("foo", parameters, ["a", "b"]) == "foo/a-1_b-2" -@pytest.fixture(name="TrialStoreImplementation", params=[FileTrialStore]) -def _TrialStoreImplementation(request): - return request.param +def test_file_trial_store(tmp_path): + config = {"store": "FileTrialStore"} + with Context(str(tmp_path), config) as ctx: + ctx: Context + with ctx.store: + ctx.store: FileTrialStore + # Check that folders named {}/trial.yaml do not disturb the store + fake_folder = os.path.join(ctx.wdir, ctx.store.PATTERN.format("fake")) + os.makedirs(fake_folder, exist_ok=True) + assert len(ctx.store) == 0 -def test_trial_store(tmp_path, TrialStoreImplementation: Type[TrialStore]): - with Context(str(tmp_path), writable=True) as ctx: - ctx: Context - trial_store: TrialStore = TrialStoreImplementation(ctx) +@pytest.mark.parametrize( + "store", TrialStore._implementations.keys() # pylint: disable=protected-access +) +def test_trial_store(tmp_path_factory, store, random_ipc_endpoint): + config = {"store": store} + + if store == "RemoteFileTrialStore": + config["remote_endpoint"] = random_ipc_endpoint + + # Start server + import gevent + + from experitur.server import ExperiturServer + + server_config = {"store": "MemoryTrialStore"} + server_ctx = Context(str(tmp_path_factory.mktemp("server_ctx")), server_config) + server = ExperiturServer(server_ctx) + server.bind(random_ipc_endpoint) + gevent.spawn(server.run) + + with Context( + str(tmp_path_factory.mktemp("client_ctx")), config, writable=True + ) as ctx: + trial_store: TrialStore = ctx.store @Experiment("test", parameters={"a": [1, 2], "b": [2, 3]}) def test(_):