From 4c68e360140546093fd0d6e048b4d3f37b36a7df Mon Sep 17 00:00:00 2001 From: "basti.werner" Date: Thu, 10 Nov 2022 10:20:38 +0100 Subject: [PATCH 1/7] minor fixes to improve testing --- .vscode/settings.json | 7 ++ README.md | 4 +- basht/__init__.py | 4 +- basht/utils/yaml.py | 6 +- experiments/optuna_minikube/check_trial.py | 27 -------- experiments/optuna_minikube/optuna_trial.py | 11 +-- .../optuna_minikube/test_check_trial.py | 68 +++++++++++++++++++ 7 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 .vscode/settings.json delete mode 100644 experiments/optuna_minikube/check_trial.py create mode 100644 experiments/optuna_minikube/test_check_trial.py diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3e99ede --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "." + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/README.md b/README.md index a89cee6..19afdfe 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ This repository supplies a job to benchmark hyperparameter tuning. ### Prerequisites -* Ubuntu >= 16.04 (not tested on MAC OS) -* Python >= 3.7.0 +* Ubuntu >= 20.04 (not tested on MAC OS) +* Python >= 3.9.13 * [PyTorch](https://pytorch.org/get-started/locally/) Note: If you run your benchmark on GPU make sure to install [Cuda](https://docs.nvidia.com/cuda/cuda-installation-guide-microsoft-windows/index.html) and install the correct PyTorch Version, which fits your Cuda version. diff --git a/basht/__init__.py b/basht/__init__.py index 2e24778..22fb3d2 100644 --- a/basht/__init__.py +++ b/basht/__init__.py @@ -4,7 +4,9 @@ __version__ = "develop" install_requires = [ "scikit-learn==0.24.2", - "tqdm==4.64.1", "SQLAlchemy==1.4.31", "docker==4.4.2", + "tqdm==4.64.1", + "SQLAlchemy==1.4.31", + "docker==4.4.2", "psycopg2-binary", "prometheus-api-client==0.5.1", "ruamel.yaml==0.17.21"], diff --git a/basht/utils/yaml.py b/basht/utils/yaml.py index aae8668..2b59ccc 100644 --- a/basht/utils/yaml.py +++ b/basht/utils/yaml.py @@ -1,4 +1,5 @@ from string import Template + import ruamel.yaml @@ -6,10 +7,9 @@ class YMLHandler: @staticmethod def load_yaml(file_path): - """Safely writes an object to a YAML-File. + """Loads a yaml file and returns a dictionary Args: - yaml_path (str): filename to write yaml to - obj (any): object to save as yaml + file_path (str): filename to load yaml from """ with open(file_path, "r") as f: file_dict = ruamel.yaml.safe_load(f) diff --git a/experiments/optuna_minikube/check_trial.py b/experiments/optuna_minikube/check_trial.py deleted file mode 100644 index ef898e4..0000000 --- a/experiments/optuna_minikube/check_trial.py +++ /dev/null @@ -1,27 +0,0 @@ - -import os -from time import sleep -from experiments.optuna_minikube.optuna_trial import main -from basht.metrics_storage import MetricsStorage - - -def test_check_trail(): - metrics_storage = MetricsStorage() - try: - metrics_storage.start_db() - sleep(5) - os.environ["METRICS_STORAGE_HOST"] = MetricsStorage.host - os.environ["DB_CONN"] = MetricsStorage.connection_string - os.environ["N_TRIALS"] = "10" - os.environ["EPOCHS"] = "1" - - f = main() - assert f - - lats = metrics_storage.get_latency_results() - assert len(lats) >= int(os.environ["N_TRIALS"])*2 #(validate+train) - finally: - metrics_storage.stop_db() - -#TODO: do the same for the container .... -# def test_trail_container(): diff --git a/experiments/optuna_minikube/optuna_trial.py b/experiments/optuna_minikube/optuna_trial.py index 44b811c..93a2ad3 100644 --- a/experiments/optuna_minikube/optuna_trial.py +++ b/experiments/optuna_minikube/optuna_trial.py @@ -40,16 +40,15 @@ def __call__(self, trial): return validation_scores["macro avg"]["f1-score"] -def main(): +def main(resource_def): try: - resource_path = os.path.join(os.path.dirname(__file__), "resource_definition.yml") - resource_def = YMLHandler.load_yaml(resource_path) study_name = os.environ.get("STUDY_NAME", "Test-Study") database_conn = os.environ.get("DB_CONN") - n_trials = int(os.environ.get("N_TRIALS", 2)) + hyperparameter = resource_def.get("hyperparameter") search_space = generate_search_space(hyperparameter) workload_def = resource_def.get("workload") + n_trials = resource_def.get("trials") optuna_trial = OptunaTrial( search_space, dl_framework=workload_def.get("dl_framework"), model_cls=workload_def.get("model_cls"), @@ -69,7 +68,9 @@ def main(): if __name__ == "__main__": - if main(): + resource_path = os.path.join(os.path.dirname(__file__), "resource_definition.yml") + resource_def = YMLHandler.load_yaml(resource_path) + if main(resource_def): sys.exit(0) else: sys.exit(1) diff --git a/experiments/optuna_minikube/test_check_trial.py b/experiments/optuna_minikube/test_check_trial.py new file mode 100644 index 0000000..b5079a3 --- /dev/null +++ b/experiments/optuna_minikube/test_check_trial.py @@ -0,0 +1,68 @@ + +import os +from time import sleep +from experiments.optuna_minikube.optuna_trial import main +from basht.metrics_storage import MetricsStorage +from basht.utils.yaml import YMLHandler + + +def test_check_trail(): + metrics_storage = MetricsStorage() + try: + metrics_storage.start_db() + sleep(5) + os.environ["METRICS_STORAGE_HOST"] = MetricsStorage.host + os.environ["DB_CONN"] = MetricsStorage.connection_string + + + # resource_path = os.path.join(os.path.dirname(__file__), "resource_definition.yml") + # resource_def = YMLHandler.load_yaml(resource_path) + resource_def = { + "trials": 2, + "workload": { + "dl_framework": "torch", + "task": { + "loader": "mnist", + "preprocessors": ["ImageFlattner"], + "splitter": { + "type": "StandardSplitter", + "config": { + "val_split": 0.2, + "test_split": 0.2, + }, + }, + "batcher": { + "type": "StandardBatcher", + "config": { + "train_batch_size": 50, + "val_batch_size": 50, + "test_batch_size": 50, + }, + }, + }, + "model_cls": "mlp", + "epochs": 2, + "device": "cpu", + + }, + "hyperparameter": { + "learning_rate": { + "start": 1e-4, + "step_size": 1e-3, + "end": 1e-2 + }, + + } + } + + f = main(dict(resource_def)) + assert f + + lats = metrics_storage.get_latency_results() + assert len(lats) >= int(resource_def.get("trials")) * \ + 2 # (validate+train) + finally: + metrics_storage.stop_db() + +# TODO: do the same for the container .... +# def test_trail_container(): From d3b184c60a083ab6dd814e5b05a74a84858920fd Mon Sep 17 00:00:00 2001 From: "basti.werner" Date: Thu, 10 Nov 2022 10:21:00 +0100 Subject: [PATCH 2/7] added nfs metrics to resouce tracker --- basht/metrics.py | 2 ++ basht/metrics_storage.py | 1 + basht/resource_tracker.py | 10 +++++++--- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/basht/metrics.py b/basht/metrics.py index 6adea31..631486f 100644 --- a/basht/metrics.py +++ b/basht/metrics.py @@ -34,6 +34,7 @@ def __init__(self, node_id): self.network_usage = None self.accelerator_usage = None self.wattage = None + self.nfs_usage = None self.processes = None def to_dict(self): @@ -43,6 +44,7 @@ def to_dict(self): cpu_usage=self.cpu_usage, memory_usage=self.memory_usage, network_usage=self.network_usage, + nfs_usage=self.nfs_usage, wattage=self.wattage, processes=int(self.processes), ) diff --git a/basht/metrics_storage.py b/basht/metrics_storage.py index 576fee6..48c788d 100644 --- a/basht/metrics_storage.py +++ b/basht/metrics_storage.py @@ -107,6 +107,7 @@ def create_resource_table(self): Column("cpu_usage", Float), Column("memory_usage", Float), Column("network_usage", Float), + Column("nfs_usage", Float), Column("accelerator_usage", Float), Column("wattage", Float), Column("processes", Integer), diff --git a/basht/resource_tracker.py b/basht/resource_tracker.py index 687908b..094b78c 100644 --- a/basht/resource_tracker.py +++ b/basht/resource_tracker.py @@ -42,7 +42,7 @@ def _check_metrics(self): available = set(self.prm.all_metrics()) #check node_exporter metrics - cpu/memory - required = {"node_memory_MemFree_bytes", "node_memory_MemTotal_bytes", "node_cpu_seconds_total","scaph_host_power_microwatts","scaph_process_power_consumption_microwatts"} + required = {"node_memory_MemFree_bytes", "node_memory_MemTotal_bytes", "node_cpu_seconds_total","scaph_host_power_microwatts","scaph_process_power_consumption_microwatts","node_nfs_rpcs_total"} if not required.issubset(available): raise ValueError("Prometheus does not provide the required metrics.") @@ -74,7 +74,7 @@ def _query(self): # ? is there a better way to map nodes using the node_exporter memory = 'avg by (instance) (node_memory_MemFree_bytes/node_memory_MemTotal_bytes)' cpu = '100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[2m])*100))' - + nfs = 'sum by (node) (node_nfs_rpcs_total)' ##needs mapping network = f'sum by (instance) (rate({self.network_metric}_receive_bytes_total[2m])+rate({self.network_metric}_transmit_bytes_total[2m]))' #TODO: reduce measurments to only the ones we care about - dose currently not work with scaph_process_power_consumption_microwatts @@ -90,6 +90,7 @@ def _query(self): cpu_result = self.prm.custom_query(cpu) network_result = self.prm.custom_query(network) wattage_result = self.prm.custom_query(wattage) + nfs_result = self.prm.custom_query(nfs) processes_result = self.prm.custom_query(processes) logging.debug("Got results from Prometheus.", mem_result, cpu_result, network_result) @@ -99,11 +100,13 @@ def _query(self): #grab the data per instance mem_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), mem_result)) cpu_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), cpu_result)) + network_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), network_result)) wattage_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), wattage_result)) processes_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), processes_result)) + nfs_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), nfs_result)) - logging.debug("Processed Prometheus Results", mem_result, cpu_result, network_result, wattage_result, processes_result) + logging.debug("Processed Prometheus Results", mem_result, cpu_result, network_result, wattage_result, processes_result, nfs_result) # assert mem_result.keys() == cpu_result.keys() == network_result.keys() @@ -115,6 +118,7 @@ def _query(self): n.cpu_usage = cpu_result.get(instance, 0) n.memory_usage = mem_result.get(instance, 0) n.network_usage = network_result.get(instance, 0) + n.nfs_usage = nfs_result.get(instance, 0) if instance in wattage_result: n.wattage = wattage_result[instance] n.processes = processes_result[instance] From d191f3eed1e0ef03a779e129ea7a8374b872ee65 Mon Sep 17 00:00:00 2001 From: "basti.werner" Date: Thu, 10 Nov 2022 11:46:27 +0100 Subject: [PATCH 3/7] removed vsc settings --- .gitignore | 1 + .vscode/settings.json | 7 ------- 2 files changed, 1 insertion(+), 7 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.gitignore b/.gitignore index f2ff438..9a57998 100644 --- a/.gitignore +++ b/.gitignore @@ -139,3 +139,4 @@ data/ # .envs test/test_ml_benchmark/hyperparameter_space.yml +.vscode/ \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 3e99ede..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "python.testing.pytestArgs": [ - "." - ], - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true -} \ No newline at end of file From 22fff6d5b339532349239f5fb1283b298795ab2f Mon Sep 17 00:00:00 2001 From: "basti.werner" Date: Thu, 10 Nov 2022 13:59:07 +0100 Subject: [PATCH 4/7] [WIP] implemented Resouces dataclass --- basht/resources.py | 96 +++++++++++++++++++ .../optuna_kubernetes_benchmark.py | 2 +- .../optuna_kubernetes_rmem.py | 41 ++++++++ .../optuna_minikube/test_check_trial.py | 49 ++-------- 4 files changed, 144 insertions(+), 44 deletions(-) create mode 100644 basht/resources.py create mode 100644 experiments/optuna_kubernetes/optuna_kubernetes_rmem.py diff --git a/basht/resources.py b/basht/resources.py new file mode 100644 index 0000000..ddffbf7 --- /dev/null +++ b/basht/resources.py @@ -0,0 +1,96 @@ +from dataclasses import asdict, dataclass, field +from typing import List, Literal, Union +import yaml + +@dataclass +class SplitterConfig: + val_split: float = 0.2 + test_split: float = 0.2 + +@dataclass +class Splitter: + type: str = "StandardSplitter" + config: SplitterConfig = SplitterConfig() + +@dataclass +class BatcherConfig: + train_batch_size: int = 50 + val_batch_size: int = 50 + test_batch_size: int = 50 + +@dataclass +class Batcher: + type: str = "StandardBatcher" + config: BatcherConfig = BatcherConfig() + + + +@dataclass +class Task: + preprocessors: List = field(default_factory=lambda: ["ImageFlattner"]) + loader: str = "mnist" + + splitter: 'Splitter' = Splitter() + batcher: 'Batcher' = Batcher() + + + +@dataclass +class HiddenLayerConfig: + start: Union[float, List[int]] + end: Union[float, List[int]] + step_size: Union[float, List[int]] + +@dataclass +class Hyperparameter: + learning_rate: 'HiddenLayerConfig' = HiddenLayerConfig(start=1e-4, end=1e-2, step_size=1e-3) + weight_decay: 'HiddenLayerConfig' = HiddenLayerConfig(start=1e-6, end=1e-4, step_size=1e-5) + hidden_layer_config: 'HiddenLayerConfig' = HiddenLayerConfig(start=[10], end=[100, 100,100], step_size=[10, 1]) + +@dataclass +class Workload: + epochs: int = 100 + task: Task = Task() + dl_framework: str = "torch" + model_cls: str = "mlp" + device: str = "cpu" + +@dataclass +class Resouces: + """ + Resource definition for a HPO benchmark + + Args: + trials (int): Number of trials to run - up to the HPO + framework to enforce + + metrics_ip (str): IP address of the metrics server (usually the same as the benchmark runner) + + worker_cpu (int): Number of CPUs to allocate to each worker + worker_memory (int): Amount of memory to allocate to each worker in GB + worker_count (int): Number of workers to spawn (up to the HPO platform to translate to nodes) + + workload (Workload): Workload definition for the benchmark, including the task, model, preprocessing, etc. + + hyperparameter (Hyperparameter): Hyperparameter definition for the benchmark, including the search space size, etc. + """ + metrics_ip: str = "auto" #TODO we should instead use a factory here + + trials: int = 100 + + worker_cpu: int = 2 + worker_memory: int = 2 + worker_count: int = 1 + + + workload: 'Workload' = Workload() + hyperparameter: 'Hyperparameter' = Hyperparameter() + + def to_dict(self): + return asdict(self) + + def to_yaml(self): + return yaml.dump(self.to_dict()) + + + diff --git a/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py b/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py index a83a96d..b4269e9 100644 --- a/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py +++ b/experiments/optuna_kubernetes/optuna_kubernetes_benchmark.py @@ -55,7 +55,7 @@ def deploy(self) -> None: Deploy DB """ # TODO: deal with exsiting resources... - + #XXX is this still relevant??! if self.hyperparameter: #TODO: XXX we got to fix this dependency thing. eitehr merge minikube/kubernetes or use the same baseclass or something... f = path.join(path.dirname(__file__),"..","optuna_minikube","hyperparameter_space.yml") diff --git a/experiments/optuna_kubernetes/optuna_kubernetes_rmem.py b/experiments/optuna_kubernetes/optuna_kubernetes_rmem.py new file mode 100644 index 0000000..2f6ec5e --- /dev/null +++ b/experiments/optuna_kubernetes/optuna_kubernetes_rmem.py @@ -0,0 +1,41 @@ +import logging +from os import path +from time import sleep +from experiments.optuna_kubernetes.optuna_kubernetes_benchmark import OptunaKubernetesBenchmark +from basht.benchmark_runner import BenchmarkRunner +from urllib.request import urlopen +from basht.utils.yaml import YMLHandler + +if __name__ == "__main__": + metricsIP = urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip() + + # read in base configuration + resources = YMLHandler.load_yaml(path.join(path.dirname(__file__), "resource_definition.yml")) + # TODO: XXX remove this hardcoded values + to_automate = { + "metricsIP": metricsIP, + "dockerImageTag": "tawalaya/optuna-trial:latest", + "dockerImageBuilder": "docker", + "kubernetesContext": "admin@smile", + "kubernetesMasterIP": "130.149.158.143", + "prometheus_url": "http://130.149.158.143:30041", + "deleteAfterRun": True, + "epochs": 50, + } + resources.update(to_automate) + + repetions = 2 + for i in range(1,repetions+1): + for n in range(1,7): + sleep(3) + logging.info(f"Starting Run {i} with {n} nodes with n_trails 100") + try: + resources["trials"] = 100 + resources["workerCount"] = n + resources["goal"] = f"rnode{n}-100-{i}" + runner = BenchmarkRunner(benchmark_cls=OptunaKubernetesBenchmark, resources=resources) + runner.run() + sleep(7) + runner = None + except Exception as e: + logging.warning(f'Failed Run {i} with {n} nodes and n_trails 100 - {e}') diff --git a/experiments/optuna_minikube/test_check_trial.py b/experiments/optuna_minikube/test_check_trial.py index b5079a3..36c1b66 100644 --- a/experiments/optuna_minikube/test_check_trial.py +++ b/experiments/optuna_minikube/test_check_trial.py @@ -4,7 +4,7 @@ from experiments.optuna_minikube.optuna_trial import main from basht.metrics_storage import MetricsStorage from basht.utils.yaml import YMLHandler - +from basht.resources import Resouces def test_check_trail(): metrics_storage = MetricsStorage() @@ -14,52 +14,15 @@ def test_check_trail(): os.environ["METRICS_STORAGE_HOST"] = MetricsStorage.host os.environ["DB_CONN"] = MetricsStorage.connection_string + resource_def = Resouces() + resource_def.trials = 2 + resource_def.workload.epochs = 2 - # resource_path = os.path.join(os.path.dirname(__file__), "resource_definition.yml") - # resource_def = YMLHandler.load_yaml(resource_path) - resource_def = { - "trials": 2, - "workload": { - "dl_framework": "torch", - "task": { - "loader": "mnist", - "preprocessors": ["ImageFlattner"], - "splitter": { - "type": "StandardSplitter", - "config": { - "val_split": 0.2, - "test_split": 0.2, - }, - }, - "batcher": { - "type": "StandardBatcher", - "config": { - "train_batch_size": 50, - "val_batch_size": 50, - "test_batch_size": 50, - }, - }, - }, - "model_cls": "mlp", - "epochs": 2, - "device": "cpu", - - }, - "hyperparameter": { - "learning_rate": { - "start": 1e-4, - "step_size": 1e-3, - "end": 1e-2 - }, - - } - } - - f = main(dict(resource_def)) + f = main(resource_def.to_dict()) assert f lats = metrics_storage.get_latency_results() - assert len(lats) >= int(resource_def.get("trials")) * \ + assert len(lats) >= int(resource_def.trials) * \ 2 # (validate+train) finally: metrics_storage.stop_db() From 191e128bce2060cef9ae26b129da999e238c2c40 Mon Sep 17 00:00:00 2001 From: "basti.werner" Date: Fri, 11 Nov 2022 16:51:17 +0100 Subject: [PATCH 5/7] updated task to use Resouces as well. added feature to handle abitrary keys added yaml loader --- basht/benchmark_runner.py | 2 +- basht/resources.py | 44 +++++++++++++++---- experiments/optuna_minikube/optuna_trial.py | 32 +++++++------- .../optuna_minikube/test_check_trial.py | 5 +-- test/conftest.py | 24 +++++----- test/hyperparameter_space.yml | 6 --- test/test.yaml | 24 +++++++--- test/test_resource.py | 26 +++++++++++ test/test_workload/test_objective.py | 15 ++----- test/test_yaml.py | 12 ----- 10 files changed, 115 insertions(+), 75 deletions(-) delete mode 100644 test/hyperparameter_space.yml create mode 100644 test/test_resource.py delete mode 100644 test/test_yaml.py diff --git a/basht/benchmark_runner.py b/basht/benchmark_runner.py index b203522..087ef60 100644 --- a/basht/benchmark_runner.py +++ b/basht/benchmark_runner.py @@ -16,7 +16,7 @@ from basht.resource_tracker import ResourceTracker from basht.metrics import Latency - +#TODO: update to use Resources class class Benchmark(ABC): """ This class serves as an Interface for a benchmark. All neccessary methods have to be implemented in the diff --git a/basht/resources.py b/basht/resources.py index ddffbf7..483c05b 100644 --- a/basht/resources.py +++ b/basht/resources.py @@ -1,5 +1,5 @@ -from dataclasses import asdict, dataclass, field -from typing import List, Literal, Union +from dataclasses import asdict, dataclass, field, fields, is_dataclass +from typing import List, Dict, Union, Any import yaml @dataclass @@ -32,6 +32,9 @@ class Task: splitter: 'Splitter' = Splitter() batcher: 'Batcher' = Batcher() + + def to_dict(self): + return asdict(self) @@ -47,6 +50,9 @@ class Hyperparameter: weight_decay: 'HiddenLayerConfig' = HiddenLayerConfig(start=1e-6, end=1e-4, step_size=1e-5) hidden_layer_config: 'HiddenLayerConfig' = HiddenLayerConfig(start=[10], end=[100, 100,100], step_size=[10, 1]) + def to_dict(self): + return asdict(self) + @dataclass class Workload: epochs: int = 100 @@ -55,7 +61,7 @@ class Workload: model_cls: str = "mlp" device: str = "cpu" -@dataclass +@dataclass(init=False) class Resouces: """ Resource definition for a HPO benchmark @@ -74,17 +80,32 @@ class Resouces: hyperparameter (Hyperparameter): Hyperparameter definition for the benchmark, including the search space size, etc. """ - metrics_ip: str = "auto" #TODO we should instead use a factory here + metricsIP: str = "auto" #TODO we should instead use a factory here trials: int = 100 - worker_cpu: int = 2 - worker_memory: int = 2 - worker_count: int = 1 + workerCpu: int = 2 + workerMemory: int = 2 + workerCount: int = 1 - workload: 'Workload' = Workload() - hyperparameter: 'Hyperparameter' = Hyperparameter() + workload: Workload = Workload() + hyperparameter: Hyperparameter = Hyperparameter() + + args: Dict[str, Any] = field(default_factory=dict) + + def __init__(self, **kwargs): + self.args = dict() + names = set([f.name for f in fields(self)]) + types = dict([(f.name,f.type) for f in fields(self)]) + for k, v in kwargs.items(): + if k in names: + if is_dataclass(types[k]): + v = types[k](**v) + else: + setattr(self, k, v) + else: + self.args[k] = v def to_dict(self): return asdict(self) @@ -92,5 +113,10 @@ def to_dict(self): def to_yaml(self): return yaml.dump(self.to_dict()) + @staticmethod + def from_yaml(yaml_path:str): + with open(yaml_path, "r") as f: + return Resouces(**yaml.load(f, Loader=yaml.FullLoader)) + diff --git a/experiments/optuna_minikube/optuna_trial.py b/experiments/optuna_minikube/optuna_trial.py index 93a2ad3..7628a99 100644 --- a/experiments/optuna_minikube/optuna_trial.py +++ b/experiments/optuna_minikube/optuna_trial.py @@ -1,12 +1,14 @@ import os import sys from time import sleep + import optuna -from basht.workload.objective import Objective -from basht.utils.yaml import YMLHandler -from utils import generate_search_space from optuna.study import MaxTrialsCallback from optuna.trial import TrialState +from utils import generate_search_space + +from basht.resources import Resouces +from basht.workload.objective import Objective class OptunaTrial: @@ -39,27 +41,25 @@ def __call__(self, trial): validation_scores = self.objective.validate() return validation_scores["macro avg"]["f1-score"] - -def main(resource_def): +def main(resource:Resouces): try: study_name = os.environ.get("STUDY_NAME", "Test-Study") database_conn = os.environ.get("DB_CONN") - - hyperparameter = resource_def.get("hyperparameter") - search_space = generate_search_space(hyperparameter) - workload_def = resource_def.get("workload") - n_trials = resource_def.get("trials") + + #TODO migrate generate_search_space to use Resource.hyperparameter instead of dict + search_space = generate_search_space(resource.hyperparameter.to_dict()) + workload_def = resource.workload optuna_trial = OptunaTrial( - search_space, dl_framework=workload_def.get("dl_framework"), - model_cls=workload_def.get("model_cls"), - epochs=workload_def.get("epochs"), device=workload_def.get("device"), - task=workload_def.get("task")) + search_space, dl_framework=workload_def.dl_framework, + model_cls=workload_def.model_cls, + epochs=workload_def.epochs, device=workload_def.device, + task=workload_def.task.to_dict()) study = optuna.create_study( study_name=study_name, storage=database_conn, direction="maximize", load_if_exists=True, sampler=optuna.samplers.GridSampler(search_space)) study.optimize( optuna_trial, - callbacks=[MaxTrialsCallback(n_trials, states=(TrialState.COMPLETE,))]) + callbacks=[MaxTrialsCallback(resource.trials, states=(TrialState.COMPLETE,))]) sleep(5) return True except Exception as e: @@ -69,7 +69,7 @@ def main(resource_def): if __name__ == "__main__": resource_path = os.path.join(os.path.dirname(__file__), "resource_definition.yml") - resource_def = YMLHandler.load_yaml(resource_path) + resource_def = Resouces.from_yaml(resource_path) if main(resource_def): sys.exit(0) else: diff --git a/experiments/optuna_minikube/test_check_trial.py b/experiments/optuna_minikube/test_check_trial.py index 36c1b66..a6e13ec 100644 --- a/experiments/optuna_minikube/test_check_trial.py +++ b/experiments/optuna_minikube/test_check_trial.py @@ -18,12 +18,11 @@ def test_check_trail(): resource_def.trials = 2 resource_def.workload.epochs = 2 - f = main(resource_def.to_dict()) + f = main(resource_def) assert f lats = metrics_storage.get_latency_results() - assert len(lats) >= int(resource_def.trials) * \ - 2 # (validate+train) + assert len(lats) >= int(resource_def.trials) * 2 # (validate+train) finally: metrics_storage.stop_db() diff --git a/test/conftest.py b/test/conftest.py index 07459c7..f3196f8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -6,6 +6,7 @@ from basht.config import Path from basht.utils.yaml import YMLHandler from basht.workload.objective import Objective +from basht.resources import Resouces import torch @@ -50,24 +51,23 @@ def prometeus_url(): @pytest.fixture def resource_definition(): - test_file_path = os.path.join(Path.root_path, "test/test.yaml") - def_dict = YMLHandler.load_yaml(test_file_path) - return def_dict + res = Resouces() + res.trials = 2 + res.workload.epochs = 2 + + return res @pytest.fixture def prepared_objective(): - test_file_path = os.path.join(Path.root_path, "test/test.yaml") - resource_definition = YMLHandler.load_yaml(test_file_path) - workload_def = resource_definition["workload"] - dl_framework = workload_def["dl_framework"] - model_cls = workload_def["model_cls"] - epochs = workload_def["epochs"] - device = "cuda" if torch.cuda.is_available() else "cpu" - task = workload_def["task"] + res = Resouces() + res.trials = 2 + res.workload.epochs = 2 + res.workload.device = "cuda" if torch.cuda.is_available() else "cpu" + task = res.workload.task.to_dict() # test objective = Objective( - dl_framework=dl_framework, model_cls=model_cls, epochs=epochs, device=device, + dl_framework=res.workload.dl_framework, model_cls=res.workload.model_cls, epochs=res.workload.epochs, device=res.workload.device, task=task) return objective diff --git a/test/hyperparameter_space.yml b/test/hyperparameter_space.yml deleted file mode 100644 index eb80612..0000000 --- a/test/hyperparameter_space.yml +++ /dev/null @@ -1,6 +0,0 @@ -# generated file - do not edit -hidden_layer_config: - end: [10, 10] - start: [10] - step_size: [10, 1] -learning_rate: {end: 0.01, start: 0.0001, step_size: 1e-05} diff --git a/test/test.yaml b/test/test.yaml index 85662ce..2103d0f 100644 --- a/test/test.yaml +++ b/test/test.yaml @@ -1,11 +1,18 @@ + +workerCpu: 2 +workerMemory: 2 +workerCount: 4 +trials: 5 +metricsIP: auto ##urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), +kubernetesMasterIP: minikube ##subprocess.check_output("minikube ip", shell=True).decode("utf-8").strip("\n") +dockerImageTag: tawalaya/optuna-trial:latest +dockerImageBuilder: minikube +kubernetesNamespace: optuna-study kubernetesContext: "minikube" -metricsIP: auto -kubernetesMasterIP: minikube -deleteAfterRun: true +deleteAfterRun: True workload: # this is given to every trial and thus defines the performed work dl_framework: torch task: - loader: mnist preprocessors: - ImageFlattner splitter: @@ -26,8 +33,15 @@ hyperparameter: learning_rate: start: 1e-4 end: 1e-2 + step_size: 1e-3 + weight_decay: + start: 1e-6 + end: 1e-4 step_size: 1e-5 hidden_layer_config: start: [10] - end: [10, 10] + end: [100, 100, 100] step_size: [10, 1] + + +# TODO: distinguish between Platform and Application definition as well diff --git a/test/test_resource.py b/test/test_resource.py new file mode 100644 index 0000000..70e43b8 --- /dev/null +++ b/test/test_resource.py @@ -0,0 +1,26 @@ +from os import path +from basht.utils.yaml import YMLHandler +from basht.resources import Resouces + +def test_resource(): + res = Resouces() + + assert res.trials == 100 + assert res.workload.device == "cpu" + + + +def test_yaml(): + resources_path = path.join(path.dirname(__file__), "test.yaml") + resources = Resouces.from_yaml(resources_path) + + #check read value + assert resources.trials == 5 + #check default collision (same value as in default) + assert resources.workload.device == "cpu" + + #check default value for missing key + assert resources.workload.task.loader == "mnist" + + #check extra key + assert resources.args["kubernetesContext"] \ No newline at end of file diff --git a/test/test_workload/test_objective.py b/test/test_workload/test_objective.py index c01f7ac..c997ce2 100644 --- a/test/test_workload/test_objective.py +++ b/test/test_workload/test_objective.py @@ -1,25 +1,18 @@ import torch from basht.workload.objective import Objective - +from basht.resources import Resouces class TestObjective: def test_objective(self, resource_definition): - # setup - workload_def = resource_definition["workload"] - dl_framework = workload_def["dl_framework"] - model_cls = workload_def["model_cls"] - epochs = workload_def["epochs"] - device = "cuda" if torch.cuda.is_available() else "cpu" - task = workload_def["task"] + # test objective = Objective( - dl_framework=dl_framework, model_cls=model_cls, epochs=epochs, device=device, - task=task) + dl_framework=resource_definition.workload.dl_framework, model_cls=resource_definition.workload.model_cls, epochs=resource_definition.workload.epochs, device=resource_definition.workload.device, + task=resource_definition.workload.task.to_dict()) objective.train() # assert assert objective - assert isinstance(objective.workload_definition, dict) assert objective._functional_objective diff --git a/test/test_yaml.py b/test/test_yaml.py deleted file mode 100644 index b667232..0000000 --- a/test/test_yaml.py +++ /dev/null @@ -1,12 +0,0 @@ -from os import path -from basht.utils.yaml import YMLHandler - - -def test_yaml(): - resources = YMLHandler.load_yaml(path.join(path.dirname(__file__), "test.yaml")) - assert resources["deleteAfterRun"] - - YMLHandler.as_yaml( - path.join(path.dirname(__file__), "hyperparameter_space.yml"), resources["hyperparameter"]) - params = YMLHandler.load_yaml(path.join(path.dirname(__file__), "hyperparameter_space.yml")) - assert params == resources["hyperparameter"] From 281e90337ddeb828a3081e7f67fe9e091f39c81b Mon Sep 17 00:00:00 2001 From: Michael Gebauer | TU Date: Wed, 23 Nov 2022 20:22:23 +0100 Subject: [PATCH 6/7] minor adjustments --- basht/decorators.py | 1 + basht/resources.py | 87 ++++++++++++------- basht/results_tracker.py | 7 +- experiments/optuna_minikube/optuna_trial.py | 16 ++-- .../raytune_kubernetes/raytune_benchmark.py | 2 - test/conftest.py | 6 +- test/test_resource.py | 19 ++-- 7 files changed, 81 insertions(+), 57 deletions(-) diff --git a/basht/decorators.py b/basht/decorators.py index 1226b20..563d6d9 100644 --- a/basht/decorators.py +++ b/basht/decorators.py @@ -29,6 +29,7 @@ def result_func(*args, **kwargs): return result_func + def latency_decorator(func): """A Decorator to record the latency of the decorated function. Once it is recorded the LatencyTracker writes the result into the postgres databse. diff --git a/basht/resources.py b/basht/resources.py index 483c05b..07fb1ae 100644 --- a/basht/resources.py +++ b/basht/resources.py @@ -2,57 +2,61 @@ from typing import List, Dict, Union, Any import yaml + @dataclass class SplitterConfig: val_split: float = 0.2 test_split: float = 0.2 + @dataclass class Splitter: type: str = "StandardSplitter" config: SplitterConfig = SplitterConfig() + @dataclass class BatcherConfig: train_batch_size: int = 50 val_batch_size: int = 50 test_batch_size: int = 50 + @dataclass class Batcher: type: str = "StandardBatcher" config: BatcherConfig = BatcherConfig() - @dataclass -class Task: +class Task: # TODO: need to rename as this might create ambiguity at some point preprocessors: List = field(default_factory=lambda: ["ImageFlattner"]) loader: str = "mnist" - + splitter: 'Splitter' = Splitter() batcher: 'Batcher' = Batcher() def to_dict(self): return asdict(self) - @dataclass -class HiddenLayerConfig: +class SearchRange: start: Union[float, List[int]] end: Union[float, List[int]] step_size: Union[float, List[int]] + @dataclass -class Hyperparameter: - learning_rate: 'HiddenLayerConfig' = HiddenLayerConfig(start=1e-4, end=1e-2, step_size=1e-3) - weight_decay: 'HiddenLayerConfig' = HiddenLayerConfig(start=1e-6, end=1e-4, step_size=1e-5) - hidden_layer_config: 'HiddenLayerConfig' = HiddenLayerConfig(start=[10], end=[100, 100,100], step_size=[10, 1]) +class SearchSpace: + learning_rate: 'SearchRange' = SearchRange(start=1e-4, end=1e-2, step_size=1e-3) + weight_decay: 'SearchRange' = SearchRange(start=1e-6, end=1e-4, step_size=1e-5) + hidden_layer_config: 'SearchRange' = SearchRange(start=[10], end=[100, 100, 100], step_size=[10, 1]) def to_dict(self): return asdict(self) + @dataclass class Workload: epochs: int = 100 @@ -61,62 +65,85 @@ class Workload: model_cls: str = "mlp" device: str = "cpu" + @dataclass(init=False) -class Resouces: +class Resources: """ Resource definition for a HPO benchmark Args: - trials (int): Number of trials to run - up to the HPO + trials (int): Number of trials to run - up to the HPO framework to enforce - + metrics_ip (str): IP address of the metrics server (usually the same as the benchmark runner) - + worker_cpu (int): Number of CPUs to allocate to each worker worker_memory (int): Amount of memory to allocate to each worker in GB worker_count (int): Number of workers to spawn (up to the HPO platform to translate to nodes) - workload (Workload): Workload definition for the benchmark, including the task, model, preprocessing, etc. + workload (Workload): Workload definition for the benchmark, including the task, model, + preprocessing, etc. - hyperparameter (Hyperparameter): Hyperparameter definition for the benchmark, including the search space size, etc. + hyperparameter (Hyperparameter): Hyperparameter definition for the benchmark, + including the search space size, etc. """ - metricsIP: str = "auto" #TODO we should instead use a factory here + metricsIP: str = "auto" # TODO we should instead use a factory here trials: int = 100 workerCpu: int = 2 workerMemory: int = 2 workerCount: int = 1 + deleteAfterRun: bool = True - workload: Workload = Workload() - hyperparameter: Hyperparameter = Hyperparameter() + searchspace: SearchSpace = field(default_factory=dict) # TODO: SearchSpace() args: Dict[str, Any] = field(default_factory=dict) def __init__(self, **kwargs): self.args = dict() - names = set([f.name for f in fields(self)]) - types = dict([(f.name,f.type) for f in fields(self)]) + self._field_names = set([f.name for f in fields(self)]) + self._field_types = {f.name: f.type for f in fields(self)} + self._set_arguments(kwargs) + + def _set_arguments(self, kwargs): + """This function sets all attributes for the resource class. Some attributes are required by basht, + some are custom for the benchmark. + + Args: + kwargs (_type_): _description_ + """ for k, v in kwargs.items(): - if k in names: - if is_dataclass(types[k]): - v = types[k](**v) + if k in self._field_names: + if is_dataclass(self._field_types[k]): + v = self._field_types[k](**v) else: - setattr(self, k, v) + setattr(self, k, v) # enables custom attributes else: self.args[k] = v + def update(self, **kwargs): + self._set_arguments(kwargs) + def to_dict(self): return asdict(self) - + def to_yaml(self): return yaml.dump(self.to_dict()) + def get(self, key): + value = self.args.get(key) + if value: + return value + else: + try: + value = self.__getattribute__(key) + return value + except AttributeError: + raise AttributeError("The resource definition element {key} is not defined.") + @staticmethod - def from_yaml(yaml_path:str): + def from_yaml(yaml_path: str): with open(yaml_path, "r") as f: - return Resouces(**yaml.load(f, Loader=yaml.FullLoader)) - - - + return Resources(**yaml.load(f, Loader=yaml.FullLoader)) diff --git a/basht/results_tracker.py b/basht/results_tracker.py index e8cc7e7..04276c7 100644 --- a/basht/results_tracker.py +++ b/basht/results_tracker.py @@ -1,10 +1,11 @@ import logging -from basht.latency_tracker import Tracker #TODO: move to utils +from basht.latency_tracker import Tracker from basht.metrics import Result from basht.metrics_storage import MetricsStorageStrategy + class ResultTracker(Tracker): - def __init__(self,store=MetricsStorageStrategy): + def __init__(self, store=MetricsStorageStrategy): self.store = store() self.store.setup() @@ -18,7 +19,7 @@ def track(self, objective_function, result): r.classification_metrics = result try: - self.store.store(r,table_name="classification_metrics") + self.store.store(r, table_name="classification_metrics") logging.info("Stored result") except Exception as e: logging.error(f"failed to store result {e}") diff --git a/experiments/optuna_minikube/optuna_trial.py b/experiments/optuna_minikube/optuna_trial.py index 7760e85..c726c0f 100644 --- a/experiments/optuna_minikube/optuna_trial.py +++ b/experiments/optuna_minikube/optuna_trial.py @@ -25,11 +25,11 @@ def __call__(self, trial): hidden_layer_idx = trial.suggest_categorical( "hidden_layer_config", list(self.search_space["hidden_layer_config"].keys())) lr = trial.suggest_float( - "learning_rate", self.search_space["learning_rate"].min(), - self.search_space["learning_rate"].max(), log=True) + "learning_rate", min(self.search_space["learning_rate"]), + max(self.search_space["learning_rate"]), log=True) decay = trial.suggest_float( - "weight_decay", self.search_space["weight_decay"].min(), - self.search_space["weight_decay"].max(), log=True) + "weight_decay", min(self.search_space["weight_decay"]), + max(self.search_space["weight_decay"]), log=True) hyperparameter = { "learning_rate": lr, "weight_decay": decay, "hidden_layer_config": self.search_space.get("hidden_layer_config")[hidden_layer_idx] @@ -55,14 +55,14 @@ def pruning_function(trial, objective_storage_interface): raise optuna.TrialPruned() -def main(resource: Resources): +def main(resource_def: Resources): try: study_name = os.environ.get("STUDY_NAME", "Test-Study") database_conn = os.environ.get("DB_CONN") # TODO migrate generate_search_space to use Resource.hyperparameter instead of dict - search_space = generate_grid_search_space(resource.hyperparameter.to_dict()) - workload_def = resource.workload + search_space = generate_grid_search_space(resource_def.hyperparameter.to_dict()) + workload_def = resource_def.workload optuna_trial = OptunaTrial( search_space, dl_framework=workload_def.dl_framework, model_cls=workload_def.model_cls, @@ -73,7 +73,7 @@ def main(resource: Resources): sampler=optuna.samplers.GridSampler(search_space), pruner=optuna.pruners.MedianPruner()) study.optimize( optuna_trial, - callbacks=[MaxTrialsCallback(resource.trials, states=(TrialState.COMPLETE,))]) + callbacks=[MaxTrialsCallback(resource_def.trials, states=(TrialState.COMPLETE,))]) sleep(5) return True except Exception as e: diff --git a/experiments/raytune_kubernetes/raytune_benchmark.py b/experiments/raytune_kubernetes/raytune_benchmark.py index ef0d13a..f4f56c6 100644 --- a/experiments/raytune_kubernetes/raytune_benchmark.py +++ b/experiments/raytune_kubernetes/raytune_benchmark.py @@ -1,5 +1,3 @@ -import subprocess -import time from os import path import ray diff --git a/test/conftest.py b/test/conftest.py index f3196f8..2d93e89 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -3,10 +3,8 @@ import os from basht.decorators import latency_decorator, validation_latency_decorator -from basht.config import Path -from basht.utils.yaml import YMLHandler from basht.workload.objective import Objective -from basht.resources import Resouces +from basht.resources import Resources import torch @@ -51,7 +49,7 @@ def prometeus_url(): @pytest.fixture def resource_definition(): - res = Resouces() + res = Resources() res.trials = 2 res.workload.epochs = 2 diff --git a/test/test_resource.py b/test/test_resource.py index 70e43b8..612daea 100644 --- a/test/test_resource.py +++ b/test/test_resource.py @@ -1,26 +1,25 @@ from os import path -from basht.utils.yaml import YMLHandler -from basht.resources import Resouces +from basht.resources import Resources + def test_resource(): - res = Resouces() + res = Resources() assert res.trials == 100 assert res.workload.device == "cpu" - def test_yaml(): resources_path = path.join(path.dirname(__file__), "test.yaml") - resources = Resouces.from_yaml(resources_path) + resources = Resources.from_yaml(resources_path) - #check read value + # check read value assert resources.trials == 5 - #check default collision (same value as in default) + # check default collision (same value as in default) assert resources.workload.device == "cpu" - #check default value for missing key + # check default value for missing key assert resources.workload.task.loader == "mnist" - #check extra key - assert resources.args["kubernetesContext"] \ No newline at end of file + # check extra key + assert resources.args["kubernetesContext"] From 84e79ddb6efdcffb3b098ca9e9e5234c2c011482 Mon Sep 17 00:00:00 2001 From: Michael Gebauer | TU Date: Wed, 23 Nov 2022 20:36:59 +0100 Subject: [PATCH 7/7] minor adjustments (yet a lot to be done) --- basht/resources.py | 2 +- experiments/optuna_minikube/optuna_trial.py | 4 ++-- experiments/optuna_minikube/test_check_trial.py | 5 ++--- test/conftest.py | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/basht/resources.py b/basht/resources.py index 07fb1ae..9c84e14 100644 --- a/basht/resources.py +++ b/basht/resources.py @@ -97,7 +97,7 @@ class Resources: deleteAfterRun: bool = True workload: Workload = Workload() - searchspace: SearchSpace = field(default_factory=dict) # TODO: SearchSpace() + searchspace: SearchSpace = SearchSpace() args: Dict[str, Any] = field(default_factory=dict) diff --git a/experiments/optuna_minikube/optuna_trial.py b/experiments/optuna_minikube/optuna_trial.py index c726c0f..24b7eb1 100644 --- a/experiments/optuna_minikube/optuna_trial.py +++ b/experiments/optuna_minikube/optuna_trial.py @@ -60,8 +60,8 @@ def main(resource_def: Resources): study_name = os.environ.get("STUDY_NAME", "Test-Study") database_conn = os.environ.get("DB_CONN") - # TODO migrate generate_search_space to use Resource.hyperparameter instead of dict - search_space = generate_grid_search_space(resource_def.hyperparameter.to_dict()) + # TODO migrate generate_search_space to use Resource.hyperparameter instead of dict - whole example doesnt work yet + search_space = generate_grid_search_space(resource_def.searchspace.to_dict()) workload_def = resource_def.workload optuna_trial = OptunaTrial( search_space, dl_framework=workload_def.dl_framework, diff --git a/experiments/optuna_minikube/test_check_trial.py b/experiments/optuna_minikube/test_check_trial.py index dcb6ab9..527fd50 100644 --- a/experiments/optuna_minikube/test_check_trial.py +++ b/experiments/optuna_minikube/test_check_trial.py @@ -16,10 +16,9 @@ def test_check_trail(): resource_def.workload.epochs = 2 f = main(resource_def) - assert f - lats = metrics_storage.get_latency_results() - assert len(lats) >= int(resource_def.trials) * 2 # TODO: this is to implicit + assert f + assert len(lats) >= int(resource_def.trials) * 2 # TODO: this is to implicit finally: metrics_storage.stop_db() diff --git a/test/conftest.py b/test/conftest.py index 2d93e89..46180ec 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -58,7 +58,7 @@ def resource_definition(): @pytest.fixture def prepared_objective(): - res = Resouces() + res = Resources() res.trials = 2 res.workload.epochs = 2 res.workload.device = "cuda" if torch.cuda.is_available() else "cpu"