Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,4 @@ data/
#
.envs
test/test_ml_benchmark/hyperparameter_space.yml
portforward_log.txt
.vscode
.vscode/
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ This repository supplies a job to benchmark hyperparameter tuning.

### Prerequisites

* Ubuntu >= 16.04 (not tested on MAC OS)
* Python >= 3.7.0
* Ubuntu >= 20.04 (not tested on MAC OS)
* Python >= 3.9.13
* [PyTorch](https://pytorch.org/get-started/locally/)

Note: If you run your benchmark on GPU make sure to install [Cuda](https://docs.nvidia.com/cuda/cuda-installation-guide-microsoft-windows/index.html) and install the correct PyTorch Version, which fits your Cuda version.
Expand Down
4 changes: 3 additions & 1 deletion basht/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
__version__ = "develop"
install_requires = [
"scikit-learn==0.24.2",
"tqdm==4.64.1", "SQLAlchemy==1.4.31", "docker==4.4.2",
"tqdm==4.64.1",
"SQLAlchemy==1.4.31",
"docker==4.4.2",
"psycopg2-binary",
"prometheus-api-client==0.5.1",
"ruamel.yaml==0.17.21"],
Expand Down
2 changes: 1 addition & 1 deletion basht/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from basht.resource_tracker import ResourceTracker
from basht.metrics import Latency


#TODO: update to use Resources class
class Benchmark(ABC):
"""
This class serves as an Interface for a benchmark. All neccessary methods have to be implemented in the
Expand Down
1 change: 1 addition & 0 deletions basht/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def result_func(*args, **kwargs):

return result_func


def latency_decorator(func):
"""A Decorator to record the latency of the decorated function. Once it is recorded the LatencyTracker
writes the result into the postgres databse.
Expand Down
2 changes: 2 additions & 0 deletions basht/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self, node_id):
self.network_usage = None
self.accelerator_usage = None
self.wattage = None
self.nfs_usage = None
self.processes = None

def to_dict(self):
Expand All @@ -43,6 +44,7 @@ def to_dict(self):
cpu_usage=self.cpu_usage,
memory_usage=self.memory_usage,
network_usage=self.network_usage,
nfs_usage=self.nfs_usage,
wattage=self.wattage,
processes=int(self.processes),
)
Expand Down
1 change: 1 addition & 0 deletions basht/metrics_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def create_resource_table(self):
Column("cpu_usage", Float),
Column("memory_usage", Float),
Column("network_usage", Float),
Column("nfs_usage", Float),
Column("accelerator_usage", Float),
Column("wattage", Float),
Column("processes", Integer),
Expand Down
10 changes: 7 additions & 3 deletions basht/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _check_metrics(self):
available = set(self.prm.all_metrics())

#check node_exporter metrics - cpu/memory
required = {"node_memory_MemFree_bytes", "node_memory_MemTotal_bytes", "node_cpu_seconds_total","scaph_host_power_microwatts","scaph_process_power_consumption_microwatts"}
required = {"node_memory_MemFree_bytes", "node_memory_MemTotal_bytes", "node_cpu_seconds_total","scaph_host_power_microwatts","scaph_process_power_consumption_microwatts","node_nfs_rpcs_total"}
if not required.issubset(available):
raise ValueError("Prometheus does not provide the required metrics.")

Expand Down Expand Up @@ -74,7 +74,7 @@ def _query(self):
# ? is there a better way to map nodes using the node_exporter
memory = 'avg by (instance) (node_memory_MemFree_bytes/node_memory_MemTotal_bytes)'
cpu = '100 - (avg by (instance) (irate(node_cpu_seconds_total{mode="idle"}[2m])*100))'

nfs = 'sum by (node) (node_nfs_rpcs_total)'
##needs mapping
network = f'sum by (instance) (rate({self.network_metric}_receive_bytes_total[2m])+rate({self.network_metric}_transmit_bytes_total[2m]))'
#TODO: reduce measurments to only the ones we care about - dose currently not work with scaph_process_power_consumption_microwatts
Expand All @@ -90,6 +90,7 @@ def _query(self):
cpu_result = self.prm.custom_query(cpu)
network_result = self.prm.custom_query(network)
wattage_result = self.prm.custom_query(wattage)
nfs_result = self.prm.custom_query(nfs)
processes_result = self.prm.custom_query(processes)

logging.debug("Got results from Prometheus.", mem_result, cpu_result, network_result)
Expand All @@ -99,11 +100,13 @@ def _query(self):
#grab the data per instance
mem_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), mem_result))
cpu_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), cpu_result))

network_result = dict(map(lambda x: (self._try_norm(x["metric"]["instance"]), float(x["value"][1])), network_result))
wattage_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), wattage_result))
processes_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), processes_result))
nfs_result = dict(map(lambda x: (self._try_norm(x["metric"]["node"]), float(x["value"][1])), nfs_result))

logging.debug("Processed Prometheus Results", mem_result, cpu_result, network_result, wattage_result, processes_result)
logging.debug("Processed Prometheus Results", mem_result, cpu_result, network_result, wattage_result, processes_result, nfs_result)

# assert mem_result.keys() == cpu_result.keys() == network_result.keys()

Expand All @@ -115,6 +118,7 @@ def _query(self):
n.cpu_usage = cpu_result.get(instance, 0)
n.memory_usage = mem_result.get(instance, 0)
n.network_usage = network_result.get(instance, 0)
n.nfs_usage = nfs_result.get(instance, 0)
if instance in wattage_result:
n.wattage = wattage_result[instance]
n.processes = processes_result[instance]
Expand Down
149 changes: 149 additions & 0 deletions basht/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from dataclasses import asdict, dataclass, field, fields, is_dataclass
from typing import List, Dict, Union, Any
import yaml


@dataclass
class SplitterConfig:
val_split: float = 0.2
test_split: float = 0.2


@dataclass
class Splitter:
type: str = "StandardSplitter"
config: SplitterConfig = SplitterConfig()


@dataclass
class BatcherConfig:
train_batch_size: int = 50
val_batch_size: int = 50
test_batch_size: int = 50


@dataclass
class Batcher:
type: str = "StandardBatcher"
config: BatcherConfig = BatcherConfig()


@dataclass
class Task: # TODO: need to rename as this might create ambiguity at some point
preprocessors: List = field(default_factory=lambda: ["ImageFlattner"])
loader: str = "mnist"

splitter: 'Splitter' = Splitter()
batcher: 'Batcher' = Batcher()

def to_dict(self):
return asdict(self)


@dataclass
class SearchRange:
start: Union[float, List[int]]
end: Union[float, List[int]]
step_size: Union[float, List[int]]


@dataclass
class SearchSpace:
learning_rate: 'SearchRange' = SearchRange(start=1e-4, end=1e-2, step_size=1e-3)
weight_decay: 'SearchRange' = SearchRange(start=1e-6, end=1e-4, step_size=1e-5)
hidden_layer_config: 'SearchRange' = SearchRange(start=[10], end=[100, 100, 100], step_size=[10, 1])

def to_dict(self):
return asdict(self)


@dataclass
class Workload:
epochs: int = 100
task: Task = Task()
dl_framework: str = "torch"
model_cls: str = "mlp"
device: str = "cpu"


@dataclass(init=False)
class Resources:
"""
Resource definition for a HPO benchmark

Args:
trials (int): Number of trials to run - up to the HPO
framework to enforce

metrics_ip (str): IP address of the metrics server (usually the same as the benchmark runner)

worker_cpu (int): Number of CPUs to allocate to each worker
worker_memory (int): Amount of memory to allocate to each worker in GB
worker_count (int): Number of workers to spawn (up to the HPO platform to translate to nodes)

workload (Workload): Workload definition for the benchmark, including the task, model,
preprocessing, etc.

hyperparameter (Hyperparameter): Hyperparameter definition for the benchmark,
including the search space size, etc.
"""
metricsIP: str = "auto" # TODO we should instead use a factory here

trials: int = 100

workerCpu: int = 2
workerMemory: int = 2
workerCount: int = 1
deleteAfterRun: bool = True

workload: Workload = Workload()
searchspace: SearchSpace = SearchSpace()

args: Dict[str, Any] = field(default_factory=dict)

def __init__(self, **kwargs):
self.args = dict()
self._field_names = set([f.name for f in fields(self)])
self._field_types = {f.name: f.type for f in fields(self)}
self._set_arguments(kwargs)

def _set_arguments(self, kwargs):
"""This function sets all attributes for the resource class. Some attributes are required by basht,
some are custom for the benchmark.

Args:
kwargs (_type_): _description_
"""
for k, v in kwargs.items():
if k in self._field_names:
if is_dataclass(self._field_types[k]):
v = self._field_types[k](**v)
else:
setattr(self, k, v) # enables custom attributes
else:
self.args[k] = v

def update(self, **kwargs):
self._set_arguments(kwargs)

def to_dict(self):
return asdict(self)

def to_yaml(self):
return yaml.dump(self.to_dict())

def get(self, key):
value = self.args.get(key)
if value:
return value
else:
try:
value = self.__getattribute__(key)
return value
except AttributeError:
raise AttributeError("The resource definition element {key} is not defined.")

@staticmethod
def from_yaml(yaml_path: str):
with open(yaml_path, "r") as f:
return Resources(**yaml.load(f, Loader=yaml.FullLoader))
7 changes: 4 additions & 3 deletions basht/results_tracker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from basht.latency_tracker import Tracker #TODO: move to utils
from basht.latency_tracker import Tracker
from basht.metrics import Result
from basht.metrics_storage import MetricsStorageStrategy


class ResultTracker(Tracker):
def __init__(self,store=MetricsStorageStrategy):
def __init__(self, store=MetricsStorageStrategy):
self.store = store()
self.store.setup()

Expand All @@ -18,7 +19,7 @@ def track(self, objective_function, result):
r.classification_metrics = result

try:
self.store.store(r,table_name="classification_metrics")
self.store.store(r, table_name="classification_metrics")
logging.info("Stored result")
except Exception as e:
logging.error(f"failed to store result {e}")
6 changes: 3 additions & 3 deletions basht/utils/yaml.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from string import Template

import ruamel.yaml


class YMLHandler:

@staticmethod
def load_yaml(file_path):
"""Safely writes an object to a YAML-File.
"""Loads a yaml file and returns a dictionary
Args:
yaml_path (str): filename to write yaml to
obj (any): object to save as yaml
file_path (str): filename to load yaml from
"""
with open(file_path, "r") as f:
file_dict = ruamel.yaml.safe_load(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def deploy(self) -> None:
Deploy DB
"""
# TODO: deal with exsiting resources...

#XXX is this still relevant??!
if self.hyperparameter:
#TODO: XXX we got to fix this dependency thing. eitehr merge minikube/kubernetes or use the same baseclass or something...
f = path.join(path.dirname(__file__),"..","optuna_minikube","hyperparameter_space.yml")
Expand Down
41 changes: 41 additions & 0 deletions experiments/optuna_kubernetes/optuna_kubernetes_rmem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from os import path
from time import sleep
from experiments.optuna_kubernetes.optuna_kubernetes_benchmark import OptunaKubernetesBenchmark
from basht.benchmark_runner import BenchmarkRunner
from urllib.request import urlopen
from basht.utils.yaml import YMLHandler

if __name__ == "__main__":
metricsIP = urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip()

# read in base configuration
resources = YMLHandler.load_yaml(path.join(path.dirname(__file__), "resource_definition.yml"))
# TODO: XXX remove this hardcoded values
to_automate = {
"metricsIP": metricsIP,
"dockerImageTag": "tawalaya/optuna-trial:latest",
"dockerImageBuilder": "docker",
"kubernetesContext": "admin@smile",
"kubernetesMasterIP": "130.149.158.143",
"prometheus_url": "http://130.149.158.143:30041",
"deleteAfterRun": True,
"epochs": 50,
}
resources.update(to_automate)

repetions = 2
for i in range(1,repetions+1):
for n in range(1,7):
sleep(3)
logging.info(f"Starting Run {i} with {n} nodes with n_trails 100")
try:
resources["trials"] = 100
resources["workerCount"] = n
resources["goal"] = f"rnode{n}-100-{i}"
runner = BenchmarkRunner(benchmark_cls=OptunaKubernetesBenchmark, resources=resources)
runner.run()
sleep(7)
runner = None
except Exception as e:
logging.warning(f'Failed Run {i} with {n} nodes and n_trails 100 - {e}')
Loading