From 0489659f99c64fd88c9435c07fbb4fc1505c663c Mon Sep 17 00:00:00 2001 From: Simon-Martin Schroeder Date: Tue, 30 Jun 2020 10:41:07 +0200 Subject: [PATCH] wip: Remote --- docs/remote.md | 31 ++++++++ experitur/core/context.py | 48 +++++------- experitur/core/experiment.py | 5 +- experitur/core/trial.py | 144 +++++++++++++++++++++++----------- experitur/core/trial_store.py | 120 ++++++++++++++++++++++++++-- experitur/parameters/skopt.py | 4 +- experitur/server.py | 20 +++++ setup.py | 2 +- tests/conftest.py | 6 ++ tests/test_server.py | 47 +++++++++++ tests/test_trial_store.py | 87 ++++++++++++-------- 11 files changed, 397 insertions(+), 117 deletions(-) create mode 100644 docs/remote.md create mode 100644 experitur/server.py create mode 100644 tests/conftest.py create mode 100644 tests/test_server.py diff --git a/docs/remote.md b/docs/remote.md new file mode 100644 index 0000000..c7239be --- /dev/null +++ b/docs/remote.md @@ -0,0 +1,31 @@ +```mermaid + +sequenceDiagram + [User]->>RemoteFileTrialStore: create() + RemoteFileTrialStore->>ExperiturServer: create() + ExperiturServer->>FileTrialStore: create() + FileTrialStore-->>ExperiturServer: trial_id: str + ExperiturServer-->>RemoteFileTrialStore: trial_id: str + RemoteFileTrialStore-->>[User]: trial_id: str + + [User]->>RemoteFileTrialStore: set_data(trial_id: str, trial_data: dict) + RemoteFileTrialStore->>ExperiturServer: set_data(trial_id: str, trial_data: dict) + ExperiturServer->>FileTrialStore: set_data(trial_id: str, trial_data: dict) + + [User]->>RemoteFileTrialStore: get_data(trial_id: str) + RemoteFileTrialStore->>ExperiturServer: get_data(trial_id: str) + ExperiturServer->>FileTrialStore: get_data(trial_id: str) + FileTrialStore-->>ExperiturServer: trial_data: dict + ExperiturServer-->>RemoteFileTrialStore: trial_data: dict + RemoteFileTrialStore-->>[User]: trial_data: dict + + [User]->>RemoteFileTrialStore: filter(...) + RemoteFileTrialStore->>ExperiturServer: filter(...) + ExperiturServer->>FileTrialStore: filter(...) + FileTrialStore->>FileTrialStore: __iter__() + FileTrialStore-->>ExperiturServer: trial_datas: List[dict] + ExperiturServer-->>RemoteFileTrialStore: trial_datas: List[dict] + RemoteFileTrialStore-->>[User]: trial_datas: List[dict] + + +``` \ No newline at end of file diff --git a/experitur/core/context.py b/experitur/core/context.py index 3101be5..b66959f 100644 --- a/experitur/core/context.py +++ b/experitur/core/context.py @@ -2,11 +2,11 @@ from pathlib import Path from typing import TYPE_CHECKING, List, Union +from experitur.core.trial import RootTrialCollection, TrialCollection from experitur.errors import ExperiturError if TYPE_CHECKING: # pragma: no cover from experitur.core.experiment import Experiment - from experitur.core.trial import TrialCollection class ContextError(ExperiturError): @@ -75,20 +75,32 @@ def __init__(self, wdir=None, config=None): else: self.wdir = wdir - # Import here to break dependency cycle - from experitur.core.trial_store import FileTrialStore - - self.store = FileTrialStore(self) - # Configuration if config is None: self.config = self._default_config.copy() else: self.config = dict(self._default_config, **config) + self.store = self._initialize_store() + self.trials = RootTrialCollection(self.store) + + def _initialize_store(self): + + # Import here to break dependency cycle + from experitur.core.trial_store import TrialStore + + store_cls = TrialStore.get_implementation( + self.config.get("store", "FileTrialStore") + ) + return store_cls(self) + def _register_experiment(self, experiment): self.registered_experiments.append(experiment) + def create_trial(self, trial_data, experiment: Experiment) -> TrialData: + trial_id = self.store.create() + return + def run(self, experiments=None): """ Run the specified experiments or all. @@ -118,26 +130,7 @@ def collect(self, results_fn: Union[str, Path], failed=False): if isinstance(results_fn, Path): results_fn = str(results_fn) - try: - import pandas as pd - except ImportError: # pragma: no cover - raise RuntimeError("pandas is not available.") - - try: - from pandas import json_normalize - except ImportError: - from pandas.io.json import json_normalize - - data = [] - for trial_id, trial in self.store.items(): - if not failed and not trial.data.get("success", False): - # Skip failed trials if failed=False - continue - - data.append(trial.data) - - data = json_normalize(data, max_level=1).set_index("id") - + data = self.trials.to_pandas(failed=failed) data.to_csv(results_fn) def get_experiment(self, name) -> "Experiment": @@ -159,9 +152,6 @@ def get_experiment(self, name) -> "Experiment": print(self.registered_experiments) raise KeyError(name) from None - def get_trials(self, parameters=None, experiment=None) -> "TrialCollection": - return self.store.match(resolved_parameters=parameters, experiment=experiment) - def do(self, target, cmd, cmd_args): experiment_name = target.split("/")[0] diff --git a/experitur/core/experiment.py b/experitur/core/experiment.py index 8499bbd..3142940 100644 --- a/experitur/core/experiment.py +++ b/experitur/core/experiment.py @@ -248,7 +248,7 @@ def run(self): if self.ctx.config["skip_existing"]: # Check, if a trial with this parameter set already exists - existing = self.ctx.store.match( + existing = self.ctx.trials.filter( func=self.func, parameters=trial_configuration.get("parameters", {}), ) @@ -270,8 +270,7 @@ def run(self): # Run the trial try: - with tqdm_redirect.redirect_stdout(): - trial.run() + trial.run() except Exception: msg = textwrap.indent(traceback.format_exc(-1), " ") pbar.write("{} failed!".format(trial.data["id"])) diff --git a/experitur/core/trial.py b/experitur/core/trial.py index e45ad41..080f977 100644 --- a/experitur/core/trial.py +++ b/experitur/core/trial.py @@ -1,40 +1,31 @@ import collections.abc -import copy import datetime -import glob import inspect import itertools import os.path -import shutil import traceback -import warnings from abc import abstractmethod from collections import OrderedDict, defaultdict -from collections.abc import Collection +from collections.abc import Collection, Sequence from typing import ( TYPE_CHECKING, Any, Callable, - Dict, + Generator, Iterable, List, Mapping, + Optional, Tuple, TypeVar, Union, - overload, ) -import yaml - -from experitur.core.logger import LoggerBase, YAMLLogger -from experitur.helpers.dumper import ExperiturDumper -from experitur.helpers.merge_dicts import merge_dicts -from experitur.recursive_formatter import RecursiveDict -from experitur.util import callable_to_name +from experitur.core.logger import YAMLLogger if TYPE_CHECKING: # pragma: no cover from experitur.core.experiment import Experiment + from experitur.core.trial_store import TrialStore T = TypeVar("T") @@ -163,7 +154,7 @@ def __getattr__(self, name): ) if len(trials_data) == 1: - trial_data = trials_data.pop() + trial_data = trials_data.one() return Trial(trial_data) elif len(trials_data) > 1: msg = "Multiple matching parent experiments: " + ", ".join( @@ -188,7 +179,7 @@ def record_defaults(self, func: Callable, **defaults): Use :py:class:`functools.partial` to pass keyword parameters to `func` that should not be recorded. """ - __tracebackhide__ = True + __tracebackhide__ = True # pylint: disable=unused-variable if not callable(func): raise ValueError("Only callables may be passed as first argument.") @@ -407,7 +398,7 @@ class TrialData: Arguments store: TrialStore - data (optional): Trial data dictionary. + data: Trial data dictionary. func (optional): Experiment function. """ @@ -496,28 +487,15 @@ def get_result(self, name): return result.get(name, None) -class TrialCollection(Collection): +class TrialCollectionBase(Sequence): _missing = object() - def __init__(self, trials: List[TrialData]): - self.trials = trials - - def __len__(self): - return len(self.trials) - - def __iter__(self): - yield from self.trials - - def __contains__(self, trial: TrialData): - return trial in self.trials - - def pop(self, index=-1): - return self.trials.pop(index) + __iter__: Generator[TrialData, None, None] @property def independent_parameters(self): independent_parameters = set() - for t in self.trials: + for t in self: independent_parameters.update( t.data.get("experiment", {}).get("independent_parameters", []) ) @@ -528,7 +506,7 @@ def varying_parameters(self): """Independent parameters that vary in this trial collection.""" independent_parameters = self.independent_parameters parameter_values = defaultdict(set) - for t in self.trials: + for t in self: for p in independent_parameters: try: v = t.data["parameters"][p] @@ -539,28 +517,106 @@ def varying_parameters(self): return set(p for p in independent_parameters if len(parameter_values[p]) > 1) - def to_pandas(self): - import pandas as pd + def to_pandas(self, failed=True): + try: + from pandas import json_normalize + except ImportError: + try: + from pandas.io.json import json_normalize + except ImportError: + raise RuntimeError("pandas is not available.") + + return json_normalize( + [t.data for t in self if failed or not t.is_failed], max_level=1 + ).set_index("id") + + @abstractmethod + def one(self): + pass - return pd.json_normalize([t.data for t in self.trials], max_level=1).set_index( - "id" - ) + def filter( + self, + filter_fn: Optional[Callable[[TrialData], bool]], + func=None, + parameters=None, + experiment=None, + ) -> "TrialCollection": + """ + Return a filtered version of this trial collection. + + Args: + func (callable): A function that receives a TrialData instance and returns True if the trial should be kept. + + Returns: + A new trial collection. + """ + + if func is not None: + raise NotImplementedError() + + if parameters is not None: + raise NotImplementedError() + + if experiment is not None: + raise NotImplementedError() + + return TrialCollection(list(filter(filter_fn, self))) def one(self): - if len(self.trials) != 1: + if len(self) != 1: raise ValueError("No individual trial.") - return self.trials[0] + return self[0] + - def filter(self, fn: Callable[[TrialData], bool]) -> "TrialCollection": +class TrialCollection(TrialCollectionBase): + def __init__(self, trials: List[TrialData]): + self.trials = trials + + def __getitem__(self, index): + return self.trials[index] + + def __len__(self): + return len(self.trials) + + def __iter__(self) -> Generator[TrialData, None, None]: + yield from self.trials + + def __contains__(self, trial: TrialData): + return trial in self.trials + + +class RootTrialCollection(TrialCollectionBase): + def __init__(self, store: "TrialStore"): + self._store = store + + def __len__(self): + return len(self._store) + + def __getitem__(self, index) -> TrialData: + trial_id = sorted(self._store.keys())[index] + return TrialData(self._store, store[trial_id]) + + def filter( + self, + filter_fn: Optional[Callable[[dict], bool]] = None, + func=None, + parameters=None, + experiment=None, + ) -> "TrialCollection": """ Return a filtered version of this trial collection. Args: - fn (callable): A function that receives a TrialData instance and returns True if the trial should be kept. + filter_fn (callable): A function that receives the trial data dictionary and returns True if the trial should be kept. Returns: A new trial collection. """ - return TrialCollection(list(filter(fn, self.trials))) + return TrialCollection( + self._store.match( + func=func, experiment=experiment, resolved_parameters=parameters + ) + ).filter(filter_fn) + diff --git a/experitur/core/trial_store.py b/experitur/core/trial_store.py index 0b4c234..72606b2 100644 --- a/experitur/core/trial_store.py +++ b/experitur/core/trial_store.py @@ -3,7 +3,9 @@ import itertools import os.path import shutil -from typing import List, Mapping +import threading +from abc import abstractmethod +from typing import TYPE_CHECKING, Dict, List, Mapping import yaml @@ -13,6 +15,9 @@ from experitur.recursive_formatter import RecursiveDict from experitur.util import callable_to_name +if TYPE_CHECKING: + from experitur.core.context import Context + def _match_parameters(parameters_1, parameters_2): """Decide whether parameters_1 are a subset of parameters_2.""" @@ -39,14 +44,41 @@ def _format_independent_parameters( class TrialStore(collections.abc.MutableMapping): - def __init__(self, ctx): + _implementations: Dict[str, "TrialStore"] = {} + + def __init__(self, ctx: "Context"): self.ctx = ctx + # Class methods for the registration of trial store implementations. + @classmethod + def _register_implementation(cls): + TrialStore._implementations[cls.__name__] = cls + + @staticmethod + def get_implementation(implementation_name) -> "TrialStore": + return TrialStore._implementations[implementation_name] + + # Context manager logic: Open and close trial store. def __enter__(self): + self.open() return self def __exit__(self, *_): - pass + self.close() + + def open(self): + """ + Open the trial store. + + Overrided in subclasses. + """ + + def close(self): + """ + Close the trial store. + + Overrided in subclasses. + """ def match( self, func=None, parameters=None, experiment=None, resolved_parameters=None @@ -179,6 +211,38 @@ def delete_all(self, keys): del self[k] +class MemoryTrialStore(TrialStore): + def __init__(self, ctx: "Context"): + super().__init__(ctx) + self.data = {} + self._lock = threading.Lock() + + def __len__(self): + return len(self.data) + + def __getitem__(self, key): + trial_data = self.data[key] + return TrialData(self, data=trial_data) + + def __iter__(self): + return iter(self.data) + + def __setitem__(self, key, trial_data: TrialData): + self.data[key] = trial_data.data + + def __delitem__(self, key): + del self.data[key] + + def lock(self): + self._lock.acquire() + + def release(self): + self._lock.release() + + +MemoryTrialStore._register_implementation() # pylint: disable=protected-access + + class FileTrialStore(TrialStore): PATTERN = os.path.join("{}", "trial.yaml") DUMPER = ExperiturDumper @@ -214,13 +278,13 @@ def __iter__(self): yield k - def __setitem__(self, key, value): + def __setitem__(self, key, trial_data: TrialData): path = os.path.join(self.ctx.wdir, self.PATTERN.format(key)) path = os.path.normpath(path) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as fp: - yaml.dump(value.data, fp, Dumper=self.DUMPER) + yaml.dump(trial_data.data, fp, Dumper=self.DUMPER) # raise KeyError @@ -233,3 +297,49 @@ def __delitem__(self, key): raise KeyError shutil.rmtree(os.path.dirname(path)) + + def lock(self): + # TODO: Implement locking + pass + + def release(self): + # TODO: Implement locking + pass + + +# Make implementation known to the base class +FileTrialStore._register_implementation() # pylint: disable=protected-access + +try: + import zerorpc +except ImportError: + pass +else: + + class RemoteFileTrialStore(TrialStore): + def __init__(self, ctx: "Context"): + super().__init__(ctx) + + endpoint = self.ctx.config["remote_endpoint"] + self.client = zerorpc.Client(endpoint) + + def create(self, trial_configuration, experiment: "Experiment"): + return self.client.create_trial(trial_id) + + def __delitem__(self, trial_id): + self.client.del_trial(trial_id) + + def __getitem__(self, trial_id) -> TrialData: + return TrialData(self, self.client.get_trial_data(trial_id)) + + def __setitem__(self, trial_id, trial_data: TrialData): + return self.client.set_trial_data(trial_id, trial_data.data) + + def __iter__(self): + return iter(self.client.get_trial_ids()) + + def __len__(self): + return self.client.get_n_trials() + + # RemoteFileTrialStore._register_implementation() # pylint: disable=protected-access + diff --git a/experitur/parameters/skopt.py b/experitur/parameters/skopt.py index 7b9e3f4..7e99385 100644 --- a/experitur/parameters/skopt.py +++ b/experitur/parameters/skopt.py @@ -71,7 +71,7 @@ def __iter__(self): for parent_configuration in self.parent: # Retrieve all trials that match parent_configuration - existing_trials = self.experiment.ctx.store.match( + existing_trials = self.experiment.ctx.trials.filter( func=self.experiment.func, parameters=parent_configuration.get("parameters", {}), ) @@ -104,7 +104,7 @@ def __iter__(self): ) # Train model - existing_trials = self.experiment.ctx.store.match( + existing_trials = self.experiment.ctx.trials.filter( func=self.experiment.func, parameters=parent_configuration.get("parameters", {}), ) diff --git a/experitur/server.py b/experitur/server.py new file mode 100644 index 0000000..4a6c348 --- /dev/null +++ b/experitur/server.py @@ -0,0 +1,20 @@ +from typing import TYPE_CHECKING + +import zerorpc + +from experitur.core.trial import TrialData + +if TYPE_CHECKING: + from experitur.core.context import Context + + +class ExperiturServer(zerorpc.Server): + def __init__(self, ctx: "Context"): + super().__init__() + self.ctx = ctx + + def get_trial_data(self, trial_id): + return self.ctx.store[trial_id] + + def set_trial_data(self, trial_id, trial_data): + self.ctx.store[trial_id] = TrialData(self.ctx.store, trial_data) diff --git a/setup.py b/setup.py index 2a82a5e..e445012 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ python_requires=">=3.6", extras_require={ "tests": ["pytest", "pytest-cov"], - "optional": ["scikit-optimize", "scikit-learn", "pandas"], + "optional": ["scikit-optimize", "scikit-learn", "pandas", "zerorpc"], "docs": [ "sphinx >= 1.4", "sphinx_rtd_theme", diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..003207a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,6 @@ +import pytest + + +@pytest.fixture() +def random_ipc_endpoint(tmp_path): + return f"ipc://{tmp_path!s}/ipc" diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..36b82e5 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,47 @@ +from experitur.server import ExperiturServer +from experitur.core.context import Context + +try: + import gevent + import zerorpc +except ImportError: + pass +else: + + def test_server(tmp_path, random_ipc_endpoint): + config = {"remote_endpoint": random_ipc_endpoint} + with Context(str(tmp_path), config) as ctx: + server = ExperiturServer(ctx) + server.bind(random_ipc_endpoint) + gevent.spawn(server.run) + + client = zerorpc.Client(random_ipc_endpoint) + + client.set_trial_data("some_id", {"wdir": ".", "id": "foo"}) + + def test_zerorpc(random_ipc_endpoint): + class MySrv(zerorpc.Server): + def __init__(self): + super().__init__() + self.ctx = None + + def lock(self): + pass + + def release(self): + pass + + def get_trial_data(self, trial_id): + pass + + def set_trial_data(self, trial_id, trial_data): + pass + + srv = MySrv() + srv.bind(random_ipc_endpoint) + gevent.spawn(srv.run) + + client = zerorpc.Client() + client.connect(random_ipc_endpoint) + + client.set_trial_data("trial_id", {"wdir": ".", "id": "foo"}) diff --git a/tests/test_trial_store.py b/tests/test_trial_store.py index 450c976..3ba4b63 100644 --- a/tests/test_trial_store.py +++ b/tests/test_trial_store.py @@ -27,16 +27,41 @@ def test__format_independent_parameters(): assert _format_independent_parameters(parameters, ["a", "b"]) == "a-1_b-2" -@pytest.fixture(name="TrialStoreImplementation", params=[FileTrialStore]) -def _TrialStoreImplementation(request): - return request.param +def test_file_trial_store(tmp_path): + config = {"store": "FileTrialStore"} + with Context(str(tmp_path), config) as ctx: + ctx: Context + with ctx.store: + ctx.store: FileTrialStore + # Check that folders named {}/trial.yaml do not disturb the store + fake_folder = os.path.join(ctx.wdir, ctx.store.PATTERN.format("fake")) + os.makedirs(fake_folder, exist_ok=True) + assert len(ctx.store) == 0 + + +@pytest.mark.parametrize( + "store", TrialStore._implementations.keys() # pylint: disable=protected-access +) +def test_trial_store(tmp_path_factory, store, random_ipc_endpoint): + config = {"store": store} + + if store == "RemoteFileTrialStore": + config["remote_endpoint"] = random_ipc_endpoint + # Start server + import gevent -def test_trial_store(tmp_path, TrialStoreImplementation: Type[TrialStore]): - with Context(str(tmp_path)) as ctx: + from experitur.server import ExperiturServer + + server_config = {"store": "MemoryTrialStore"} + server_ctx = Context(str(tmp_path_factory.mktemp("server_ctx")), server_config) + server = ExperiturServer(server_ctx) + server.bind(random_ipc_endpoint) + gevent.spawn(server.run) + + with Context(str(tmp_path_factory.mktemp("client_ctx")), config) as ctx: ctx: Context - with TrialStoreImplementation(ctx) as trial_store: - trial_store: TrialStore + with ctx.store: def test(trial): return {"result": (1, 2)} @@ -48,59 +73,55 @@ def test2(trial): experiment2 = Experiment("test2", parent=experiment)(test2) - trial_store["foo"] = TrialData( - trial_store, data={"id": "foo", "wdir": "", 1: "foo", "bar": 2} + ctx.store["foo"] = TrialData( + ctx.store, data={"id": "foo", "wdir": "", 1: "foo", "bar": 2} ) - assert trial_store["foo"].data == { + assert ctx.store["foo"].data == { "id": "foo", "wdir": "", 1: "foo", "bar": 2, } - trial_store["bar/baz"] = TrialData( - trial_store, data={"id": "bar/baz", "wdir": "", 1: "foo", "bar": 2} + ctx.store["bar/baz"] = TrialData( + ctx.store, data={"id": "bar/baz", "wdir": "", 1: "foo", "bar": 2} ) - assert trial_store["bar/baz"].data == { + assert ctx.store["bar/baz"].data == { "id": "bar/baz", "wdir": "", 1: "foo", "bar": 2, } - trial = trial_store.create({"parameters": {"a": 1, "b": 2}}, experiment) - - fake_folder = os.path.join(ctx.wdir, trial_store.PATTERN.format("fake")) - - os.makedirs(fake_folder, exist_ok=True) + trial = ctx.store.create({"parameters": {"a": 1, "b": 2}}, experiment) assert trial.data["id"] == "test/a-1_b-2" - assert "test/a-1_b-2" in trial_store + assert "test/a-1_b-2" in ctx.store - assert len(trial_store) == 3 + assert len(ctx.store) == 3 - del trial_store["bar/baz"] - del trial_store["foo"] + del ctx.store["bar/baz"] + del ctx.store["foo"] with pytest.raises(KeyError): - del trial_store["foo"] + del ctx.store["foo"] - trial_store.create({"parameters": {"a": 2, "b": 3}}, experiment) - trial_store.create({"parameters": {"a": 3, "b": 4}}, experiment) - trial_store.create({"parameters": {"a": 3, "b": 4}}, experiment2) + ctx.store.create({"parameters": {"a": 2, "b": 3}}, experiment) + ctx.store.create({"parameters": {"a": 3, "b": 4}}, experiment) + ctx.store.create({"parameters": {"a": 3, "b": 4}}, experiment2) - assert set(trial.id for trial in trial_store.match(func=test)) == { + assert set(trial.id for trial in ctx.store.match(func=test)) == { "test/a-1_b-2", "test/a-2_b-3", "test/a-3_b-4", } assert set( - trial.id for trial in trial_store.match(parameters={"a": 1, "b": 2}) + trial.id for trial in ctx.store.match(parameters={"a": 1, "b": 2}) ) == {"test/a-1_b-2"} - assert set(trial.id for trial in trial_store.match(experiment="test2")) == { + assert set(trial.id for trial in ctx.store.match(experiment="test2")) == { "test2/a-3_b-4" } @@ -113,11 +134,11 @@ def test2(trial): # Write trial data back to the store trial.save() - assert trial_store["test/a-1_b-2"].data["result"] == {"result": (1, 2)} + assert ctx.store["test/a-1_b-2"].data["result"] == {"result": (1, 2)} - trial_store.create({"parameters": {"a": 1, "b": 2, "c": 1}}, experiment) + ctx.store.create({"parameters": {"a": 1, "b": 2, "c": 1}}, experiment) experiment.parameter_generator.generators[0].grid["c"] = [1, 2, 3] - trial_store.create({"parameters": {"a": 1, "b": 2, "c": 2}}, experiment) - trial_store.create({"parameters": {"a": 1, "b": 2, "c": 2}}, experiment) + ctx.store.create({"parameters": {"a": 1, "b": 2, "c": 2}}, experiment) + ctx.store.create({"parameters": {"a": 1, "b": 2, "c": 2}}, experiment)