From e3eae68e45d313d599f6e610146e53dcd781f031 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Fri, 7 Mar 2025 10:44:59 +0200 Subject: [PATCH 1/7] odop/cli: Use click group directly as a decorator --- odop/cli/__main__.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 84d47be..4685098 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -23,7 +23,7 @@ def odop_cli(): pass -@click.command() +@odop_cli.command() @click.argument("run_name", nargs=1) @click.argument( "task_folder", @@ -43,7 +43,7 @@ def scan_tasks_folder(run_name, task_folder): odop.scan_tasks_folder(task_folder, task_parameters_folder, executables_folder) -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) @click.argument("task_name", type=str) def remove_task(run_name, task_name): @@ -58,7 +58,7 @@ def remove_task(run_name, task_name): print(f"Task {task_name} not found in run {run_name}") -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def list_tasks(run_name): """List task names in given run""" @@ -70,7 +70,7 @@ def list_tasks(run_name): print(task_file.replace(".json", "")) -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def visualize_folder(run_name): """Visualize cpu utilization of all core""" @@ -96,7 +96,7 @@ def request_api(run_name, endpoint): return {} -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def queue_summary(run_name): """Return a summary of the task queue for a given run.""" @@ -106,7 +106,7 @@ def queue_summary(run_name): print(f"{status}: {summaries[status]}") -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def queue_status(run_name): """Return the status of tasks in a given run.""" @@ -119,7 +119,7 @@ def queue_status(run_name): print() -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) @click.argument("task_id", type=int) def queue_detail(run_name, task_id): @@ -128,7 +128,7 @@ def queue_detail(run_name, task_id): print("Task info:", task_info) -@click.command() +@odop_cli.command() @click.argument( "task_folder", required=False, @@ -142,14 +142,5 @@ def check_tasks(task_folder): odop.scan_tasks_folder(task_folder, write=False) -odop_cli.add_command(list_tasks) -odop_cli.add_command(scan_tasks_folder) -odop_cli.add_command(remove_task) -odop_cli.add_command(visualize_folder) -odop_cli.add_command(list_tasks) -odop_cli.add_command(queue_summary) -odop_cli.add_command(queue_status) -odop_cli.add_command(queue_detail) -odop_cli.add_command(check_tasks) if __name__ == "__main__": odop_cli() From df3493db65d669fbe7172b60ff1fb9ebdd1b8493 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 13 Feb 2025 11:28:36 +0200 Subject: [PATCH 2/7] odop: Ensure ODOP_PATH is pathlib.Path --- odop/common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/odop/common.py b/odop/common.py index 9146b19..eba35fc 100644 --- a/odop/common.py +++ b/odop/common.py @@ -26,7 +26,9 @@ def create_logger(name, level=logging.INFO): if ODOP_PATH is None: user_home = Path.home() # assume that we have odop dir - ODOP_PATH = os.path.join(user_home, ".odop") + ODOP_PATH = user_home / ".odop" os.makedirs(ODOP_PATH, exist_ok=True) +else: + ODOP_PATH = Path(ODOP_PATH) RUN_ID = os.getenv("RUN_ID") From ab1f81be93be29c4bf664e59eb08238f04763034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 13 Feb 2025 11:37:34 +0200 Subject: [PATCH 3/7] odop: Ensure ODOP_PATH is always created --- odop/common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/odop/common.py b/odop/common.py index eba35fc..dfa4cc0 100644 --- a/odop/common.py +++ b/odop/common.py @@ -27,8 +27,10 @@ def create_logger(name, level=logging.INFO): user_home = Path.home() # assume that we have odop dir ODOP_PATH = user_home / ".odop" - os.makedirs(ODOP_PATH, exist_ok=True) else: ODOP_PATH = Path(ODOP_PATH) +if not ODOP_PATH.is_dir(): + os.makedirs(ODOP_PATH, exist_ok=True) + RUN_ID = os.getenv("RUN_ID") From cba844347b2f328792d963fd3cf9aa92290774d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 13 Feb 2025 11:38:41 +0200 Subject: [PATCH 4/7] odop: Define and use a common ODOP_RUNS_PATH variable --- odop/cli/__main__.py | 11 ++++------- odop/common.py | 2 ++ odop/ui/ui.py | 4 ++-- odop/utils/dashboard.py | 9 ++++----- odop/utils/pages/current_optask.py | 6 ++---- odop/utils/pages/real_time_monitoring.py | 10 ++++------ 6 files changed, 18 insertions(+), 24 deletions(-) diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 4685098..280dbb9 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -4,13 +4,12 @@ import requests import odop -from odop.common import ODOP_PATH +from odop.common import ODOP_RUNS_PATH from odop.ui import Status def get_status(run_name): - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) status_file = os.path.join(run_folder, "status") status = Status(status_file) return status @@ -35,8 +34,7 @@ def scan_tasks_folder(run_name, task_folder): """Loads tasks from the task folder path and all subpaths.""" print("Task folder:", task_folder) - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) executables_folder = os.path.join(run_folder, "executables") task_parameters_folder = os.path.join(run_folder, "task_parameters") @@ -76,8 +74,7 @@ def visualize_folder(run_name): """Visualize cpu utilization of all core""" from .visualization.visualize_all_folder import process_folder - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) data_folder = os.path.join(run_folder, "metric_database") process_folder(data_folder) diff --git a/odop/common.py b/odop/common.py index dfa4cc0..1fcd919 100644 --- a/odop/common.py +++ b/odop/common.py @@ -34,3 +34,5 @@ def create_logger(name, level=logging.INFO): os.makedirs(ODOP_PATH, exist_ok=True) RUN_ID = os.getenv("RUN_ID") + +ODOP_RUNS_PATH = ODOP_PATH / "runs" diff --git a/odop/ui/ui.py b/odop/ui/ui.py index 38589a8..5da79cf 100644 --- a/odop/ui/ui.py +++ b/odop/ui/ui.py @@ -8,7 +8,7 @@ import odop import odop.scheduler as scheduler -from odop.common import ODOP_PATH, create_logger +from odop.common import ODOP_PATH, ODOP_RUNS_PATH, create_logger from odop.odop_obs import OdopObs from odop.ui import Status, read_config @@ -188,7 +188,7 @@ def start( ) run_name = runtime_config["run_name"] - self.run_folder = os.path.join(ODOP_PATH, "runs", run_name) + self.run_folder = os.path.join(ODOP_RUNS_PATH, run_name) os.makedirs(self.run_folder, exist_ok=True) self.status = Status(os.path.join(self.run_folder, "status")) self.status.reset() diff --git a/odop/utils/dashboard.py b/odop/utils/dashboard.py index c4474ca..ae7ab98 100644 --- a/odop/utils/dashboard.py +++ b/odop/utils/dashboard.py @@ -8,7 +8,7 @@ import streamlit as st from odop.cli.visualization.common import extract_data_from_file_path -from odop.common import ODOP_PATH +from odop.common import ODOP_RUNS_PATH @st.cache_data() @@ -59,12 +59,11 @@ def plot_monitoring_data_with_slider(timestamps, cpu_values, allowed_cpu_values) st.altair_chart(chart, use_container_width=True) -base_runs_path = os.path.join(ODOP_PATH, "runs") -chosen_run = st.selectbox("Which run you want to see", os.listdir(base_runs_path)) +chosen_run = st.selectbox("Which run you want to see", os.listdir(ODOP_RUNS_PATH)) node_to_choose = [ Path(filepath).stem for filepath in glob.glob( - os.path.join(base_runs_path, chosen_run, "metric_database/*.csv") + os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv") ) ] chosen_node = st.selectbox( @@ -74,7 +73,7 @@ def plot_monitoring_data_with_slider(timestamps, cpu_values, allowed_cpu_values) timestamps, cpu_values, allowed_cpu_values = load_data( os.path.join( - base_runs_path, + ODOP_RUNS_PATH, chosen_run, "metric_database", f"{chosen_node}.csv", diff --git a/odop/utils/pages/current_optask.py b/odop/utils/pages/current_optask.py index 34dba2b..df176be 100644 --- a/odop/utils/pages/current_optask.py +++ b/odop/utils/pages/current_optask.py @@ -5,7 +5,7 @@ import streamlit as st import yaml -from odop.common import ODOP_PATH +from odop.common import ODOP_RUNS_PATH def query_data(hostname: str) -> list[dict]: @@ -28,9 +28,7 @@ def get_config_data(run_path: str): return None -base_runs_path = os.path.join(ODOP_PATH, "runs") - -run_paths = os.listdir(base_runs_path) +run_paths = os.listdir(ODOP_RUNS_PATH) run_paths = filter_run_dir(run_paths) run_paths.sort() diff --git a/odop/utils/pages/real_time_monitoring.py b/odop/utils/pages/real_time_monitoring.py index bd10b82..b7538f4 100644 --- a/odop/utils/pages/real_time_monitoring.py +++ b/odop/utils/pages/real_time_monitoring.py @@ -7,7 +7,7 @@ import streamlit as st import yaml -from odop.common import ODOP_PATH, create_logger +from odop.common import ODOP_RUNS_PATH, create_logger logger = create_logger("monitoring") @@ -32,21 +32,19 @@ def get_config_data(run_path: str): return None -base_runs_path = os.path.join(ODOP_PATH, "runs") - -run_paths = os.listdir(base_runs_path) +run_paths = os.listdir(ODOP_RUNS_PATH) run_paths = filter_run_dir(run_paths) run_paths.sort() chosen_run = st.selectbox("Which run you want to monitor", run_paths) node_names = list( - glob.glob(os.path.join(base_runs_path, chosen_run, "metric_database/*.csv")) + glob.glob(os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv")) ) node_to_choose = [ Path(filepath).stem for filepath in glob.glob( - os.path.join(base_runs_path, chosen_run, "metric_database/*.csv") + os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv") ) ] node_to_choose.sort() From c7f1d02ee19cd462ce17feedbe9e68c2796b4614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 6 Mar 2025 19:42:31 +0200 Subject: [PATCH 5/7] odop: Add method to list all known ODOP runs --- odop/cli/__main__.py | 23 ++++++++++++++++++++++- odop/common.py | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 280dbb9..28d4abf 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -1,10 +1,11 @@ import os +import pathlib import click import requests import odop -from odop.common import ODOP_RUNS_PATH +from odop.common import ODOP_RUNS_PATH, get_runs from odop.ui import Status @@ -56,6 +57,26 @@ def remove_task(run_name, task_name): print(f"Task {task_name} not found in run {run_name}") +@odop_cli.command() +def list_runs(): + """List all known runs and their statuts""" + runs: list[pathlib.Path] = get_runs() + + run_name_len = 0 + for run in runs: + if len(run.name) <= run_name_len: + continue + run_name_len = len(run.name) + run_name_len = min(run_name_len, 20) + + print(f"{'': <2}\t{'RUN NAME': <{run_name_len}}\t{'STATUS': <8}") + for idx, run in enumerate(runs): + run_name = run.name + status: Status = get_status(run_name) + + print(f"{idx: <2}\t{run_name: <{run_name_len}}\t{status['runtime_status']: <8}") + + @odop_cli.command() @click.argument("run_name", type=str) def list_tasks(run_name): diff --git a/odop/common.py b/odop/common.py index 1fcd919..0250687 100644 --- a/odop/common.py +++ b/odop/common.py @@ -36,3 +36,21 @@ def create_logger(name, level=logging.INFO): RUN_ID = os.getenv("RUN_ID") ODOP_RUNS_PATH = ODOP_PATH / "runs" + +def get_runs() -> list[Path]: + runs = [] + + logger.info(f"Collecting runs from {ODOP_RUNS_PATH}.") + + if not ODOP_RUNS_PATH.is_dir(): + logger.info("The runs directory does not exist.") + return runs + + for path in ODOP_RUNS_PATH.iterdir(): + if not path.is_dir(): + logger.debug(f"{path} is not a directory. Ignoring it.") + continue + + runs.append(path) + + return sorted(runs) From 15b628b5e67e0637185436c8f8541baeb6363398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 6 Mar 2025 19:39:22 +0200 Subject: [PATCH 6/7] odop: Add a new class for tracking runs as benchmark records ODOP keeps track of individual runs. The idea is that after the end of a run the final results can be saved aside, compressed and also annotated if needed for benchmarking or archival purposes. A benchmark record should hold the following information: - the name of the ODOP run - the used ODOP con fig - metrics captured during the run using OdopObs - total execution time - runtime environment This adds only the basic class with initial loading logic. The conversion of ODOP runs into benchmark records will be added in a follow-up commit. --- odop/benchmark/__init__.py | 101 +++++++++++++++++++++++++++++++++++++ odop/cli/__main__.py | 23 +++++++++ odop/common.py | 2 + pyproject.toml | 1 + 4 files changed, 127 insertions(+) create mode 100644 odop/benchmark/__init__.py diff --git a/odop/benchmark/__init__.py b/odop/benchmark/__init__.py new file mode 100644 index 0000000..2fb2dc6 --- /dev/null +++ b/odop/benchmark/__init__.py @@ -0,0 +1,101 @@ +#!/bin/python3 + +from __future__ import annotations + +import contextlib +import datetime +import heapq +import json +import logging +import os +import pathlib +import shutil +import tarfile +import uuid + +import click +import tinyflux + +from odop.common import ODOP_CONF_FILENAME, ODOP_PATH, ODOP_RUNS_PATH, create_logger +from odop.ui import read_config + +ODOP_BENCHMARKS_PATH = ODOP_PATH / "benchmarks" + +RECORD_INFO_FILENAME = "info.json" + +logger = create_logger("benchmark", logging.ERROR) + + +def _check_uuid(id) -> bool: + try: + uuid.UUID(str(id)) + except ValueError: + return False + + return True + + +# id +# run_name +# ODOP config +# Metrics (merged and compressed) +# Runtime environment dump +class BenchmarkRecord: + def __init__(self, id: str): + self.id: str = id + + self._path: pathlib.Path = ODOP_BENCHMARKS_PATH / self.id + self._config_path: pathlib.Path = self._path / ODOP_CONF_FILENAME + self._info_path: pathlib.Path = self._path / RECORD_INFO_FILENAME + + self._validate() + + with open(self._info_path) as f: + with json.load(f) as info: + self.run_name = info["run_name"] + + def _validate(self): + if not self._path.is_dir(): + raise FileNotFoundError + if not os.access(self._path, os.R_OK | os.W_OK | os.X_OK): + raise PermissionError + + if not self._config_path.is_file(): + raise FileNotFoundError + if not os.access(self._config_path, os.R_OK): + raise PermissionError + + if not self._info_path.is_file(): + raise FileNotFoundError + if not os.access(self._info_path, os.R_OK): + raise PermissionError + + @classmethod + def from_benchmark_record(cls, record_path: pathlib.Path) -> BenchmarkRecord: + id = record_path.name + if not _check_uuid(id): + raise ValueError(f"{id} is not a valid UUID.") + + # TODO: Add parsing of info file + + return cls(id) + + +def get_benchmark_records() -> list[BenchmarkRecord]: + records: list[BenchmarkRecord] = [] + + logger.info(f"Collecting benchmark records from {ODOP_BENCHMARKS_PATH}.") + + if not ODOP_BENCHMARKS_PATH.is_dir(): + logger.info("The benchmarks directory does not exist.") + return records + + for path in ODOP_BENCHMARKS_PATH.iterdir(): + if not path.is_dir(): + logger.debug(f"{path} is not a directory. Ignoring it.") + continue + + record: BenchmarkRecord = BenchmarkRecord.from_benchmark_record(path) + records.append(record) + + return records diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 28d4abf..49ca5ac 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -5,6 +5,7 @@ import requests import odop +from odop.benchmark import BenchmarkRecord, get_benchmark_records from odop.common import ODOP_RUNS_PATH, get_runs from odop.ui import Status @@ -160,5 +161,27 @@ def check_tasks(task_folder): odop.scan_tasks_folder(task_folder, write=False) +@odop_cli.group() +def benchmark(): + pass + + +@benchmark.command("list") +def benchmark_list(): + """List known benchmark records.""" + records: list[BenchmarkRecord] = get_benchmark_records() + + run_name_len = 0 + for record in records: + if len(record.run_name) <= run_name_len: + continue + run_name_len = len(record.run_name) + run_name_len = min(run_name_len, 20) + + print(f"{'ID': <36}\t{'RUN NAME': <{run_name_len}}") + for record in records: + print(f"{record.id: <36}\t{record.run_name: <{run_name_len}}") + + if __name__ == "__main__": odop_cli() diff --git a/odop/common.py b/odop/common.py index 0250687..1ab106a 100644 --- a/odop/common.py +++ b/odop/common.py @@ -17,6 +17,8 @@ def create_logger(name, level=logging.INFO): logger = create_logger("odop") +ODOP_CONF_FILENAME = "odop_conf.yaml" + try: ODOP_PATH = os.getenv("ODOP_PATH") except KeyError: diff --git a/pyproject.toml b/pyproject.toml index 891c25f..40f29b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pydantic-settings", "loguru>=0.7.2", "streamlit>=1.39.0", + "tinyflux", ] [project.optional-dependencies] From a69e94d153d174ed5156594bd00cd45da9ff7f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20M=C3=ADchal?= Date: Thu, 6 Mar 2025 19:40:48 +0200 Subject: [PATCH 7/7] odop/benchmark: Add conversion of an ODOP run into a benchmark record During a run ODOP collects a number of metrics used not just for scheduling. They are vital when wanting to benchmark the effect of ODOP on an application. Currently OdobObs uses Tinyflux[0] library for storing time-series metrics. These are stored per-node in a multi-node set-up. For more convenient post-processing, it seems better to have all the metrics from all nodes in the same time series. This is merge is done using a heap queue. In case the amount of metrics is already quite high, the recommendation for tinyflux is to split a database into multiple files to keep the query speeds reasonable[1]. [0] https://github.com/citrusvanilla/tinyflux [1] https://tinyflux.readthedocs.io/en/latest/tips.html#dealing-with-growing-datasets --- odop/benchmark/__init__.py | 120 +++++++++++++++++++++++++++++++++++++ odop/cli/__main__.py | 30 ++++++++++ 2 files changed, 150 insertions(+) diff --git a/odop/benchmark/__init__.py b/odop/benchmark/__init__.py index 2fb2dc6..b56962b 100644 --- a/odop/benchmark/__init__.py +++ b/odop/benchmark/__init__.py @@ -21,6 +21,7 @@ ODOP_BENCHMARKS_PATH = ODOP_PATH / "benchmarks" +METRICS_TAR_FILENAME = "metrics.tar.gz" RECORD_INFO_FILENAME = "info.json" logger = create_logger("benchmark", logging.ERROR) @@ -35,6 +36,64 @@ def _check_uuid(id) -> bool: return True +# TODO: Cleanup the in-progress benchmark directory in case there was a problem during the process. +def _process_run_metrics(metrics_path: pathlib.Path, output_path: pathlib.Path): + """ + Read all metrics and re-save by adding a new tag that identifies the machine where the measurement was taken. If + there are too many, split them across multiple files ordered by the time of measurements. + + """ + logger.info("Merging run metric databases into a single stream.") + + if not metrics_path.is_dir(): + raise FileNotFoundError(f"Metrics directory {metrics_path} does not exist.") + + if not output_path.is_dir(): + raise FileNotFoundError(f"Output directory {output_path} does not exist.") + + if not os.access(output_path, os.W_OK | os.X_OK): + raise PermissionError(f"Output directory {output_path} is not writeable.") + + max_records_in_db = 100000 + + def create_db(time: datetime.datetime) -> tinyflux.TinyFlux: + time_str = time.strftime("%s") + db_path = output_path / f"metrics-{time_str}.csv" + return tinyflux.TinyFlux(db_path), db_path + + metric_files = list( + filter(lambda p: p.is_file() and p.suffix == ".csv", metrics_path.iterdir()) + ) + + # Merge the per-node metric file into a single ordered time-series database. Only if the number of records starts + # exceeding a certain threshold, create a new DB which will contain entries from a specific timestamp. + db: tinyflux.TinyFlux = None + db_path: pathlib.Path = None + + run_metrics_dbs: list[tinyflux.TinyFlux] = [ + iter(tinyflux.TinyFlux(path)) for path in metric_files + ] + with tarfile.open(output_path / METRICS_TAR_FILENAME, "w:gz") as tar: + it = heapq.merge(*run_metrics_dbs, key=lambda point: point.time) + for point in it: + if db is None: + db, db_path = create_db(point.time) + + if len(db) >= max_records_in_db: + # Store the now saturated database in the compressed tarball. + tar.add(db_path) + db_path.unlink() + + db, db_path = create_db(point.time) + + # TODO: Add node identifier to the point + db.insert(point) + + # There is always at least one database left. + tar.add(db_path) + db_path.unlink() + + # id # run_name # ODOP config @@ -47,6 +106,7 @@ def __init__(self, id: str): self._path: pathlib.Path = ODOP_BENCHMARKS_PATH / self.id self._config_path: pathlib.Path = self._path / ODOP_CONF_FILENAME self._info_path: pathlib.Path = self._path / RECORD_INFO_FILENAME + self._metrics_path: pathlib.Path = self._path / METRICS_TAR_FILENAME self._validate() @@ -80,6 +140,66 @@ def from_benchmark_record(cls, record_path: pathlib.Path) -> BenchmarkRecord: return cls(id) + @classmethod + def from_run(cls, run_path: pathlib.Path) -> BenchmarkRecord: + """ + + Returns: + + Raises: + """ + run_name: str = run_path.name + + if not run_path.is_dir(): + logger.errorf("Run directory {run_path} does not exist.") + raise FileNotFoundError + + logger.info(f"Creating a new benchmark record from run {run_name}.") + + # Create an identifier and prepare a directory for the benchmark record data. + # In case the identifier is not unique, try again with a new one. + while True: + id = str(uuid.uuid4()) + + record_path: pathlib.Path = ODOP_BENCHMARKS_PATH / id + try: + record_path.mkdir(0o755, True, False) + except FileExistsError: + continue + + break + + # Populate the info file of the record. + info = { + "run_name": run_name, + } + + record_info_path: pathlib.Path = record_path / RECORD_INFO_FILENAME + with open(record_info_path, "w+") as f: + json.dump(info, f) + + # Archive the odop_conf.yaml used in the run. + odop_conf_src_path: pathlib.Path = run_path / ODOP_CONF_FILENAME + if odop_conf_src_path.is_file(): + odop_conf_dst_path: pathlib.Path = record_path / ODOP_CONF_FILENAME + shutil.copy2(odop_conf_src_path, odop_conf_dst_path) + else: + logger.warning( + f"The {run_path} directory does not contain the {ODOP_CONF_FILENAME} used in the run. It cannot be included with the benchmark." + ) + + # TODO: How to capture the environment information? Should this already be marked down by the OdopObs since it + # tracks it? + + # Find and merge the node-specific metrics into a single time-ordered stream. + metrics_path: pathlib.Path = run_path / "metric_database" + try: + _process_run_metrics(metrics_path, record_path) + except: + raise + + return cls(id) + def get_benchmark_records() -> list[BenchmarkRecord]: records: list[BenchmarkRecord] = [] diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 49ca5ac..74cf294 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -1,5 +1,6 @@ import os import pathlib +import sys import click import requests @@ -183,5 +184,34 @@ def benchmark_list(): print(f"{record.id: <36}\t{record.run_name: <{run_name_len}}") +@benchmark.command("save") +@click.argument("run_id", type=click.IntRange(0), required=True) +def benchmark_save(run_id: int): + """Convert an ODOP run into a benchmark record.""" + # TODO: Add support for specifying using run name. Will require a prompt in case there are multiple runs with the + # same name. + runs: list[pathlib.Path] = get_runs() + if len(runs) == 0: + print("There are no runs to save. Aborting.") + + if run_id > len(runs) - 1: + raise click.BadArgumentUsage( + f"Value {run_id} of 'run_id' is not valid. Use values from within the range <0, {len(runs) - 1}>." + ) + + selected_run: pathlib.Path = runs[run_id] + try: + record: BenchmarkRecord = BenchmarkRecord.from_run(selected_run) + except FileNotFoundError: + print( + f"Unexpected error: The run directory {selected_run} does not exist.", + file=sys.stderr, + ) + + print( + f"Created a new benchmark record of run {record.run_name} with ID {record.id}." + ) + + if __name__ == "__main__": odop_cli()