diff --git a/odop/benchmark/__init__.py b/odop/benchmark/__init__.py new file mode 100644 index 0000000..b56962b --- /dev/null +++ b/odop/benchmark/__init__.py @@ -0,0 +1,221 @@ +#!/bin/python3 + +from __future__ import annotations + +import contextlib +import datetime +import heapq +import json +import logging +import os +import pathlib +import shutil +import tarfile +import uuid + +import click +import tinyflux + +from odop.common import ODOP_CONF_FILENAME, ODOP_PATH, ODOP_RUNS_PATH, create_logger +from odop.ui import read_config + +ODOP_BENCHMARKS_PATH = ODOP_PATH / "benchmarks" + +METRICS_TAR_FILENAME = "metrics.tar.gz" +RECORD_INFO_FILENAME = "info.json" + +logger = create_logger("benchmark", logging.ERROR) + + +def _check_uuid(id) -> bool: + try: + uuid.UUID(str(id)) + except ValueError: + return False + + return True + + +# TODO: Cleanup the in-progress benchmark directory in case there was a problem during the process. +def _process_run_metrics(metrics_path: pathlib.Path, output_path: pathlib.Path): + """ + Read all metrics and re-save by adding a new tag that identifies the machine where the measurement was taken. If + there are too many, split them across multiple files ordered by the time of measurements. + + """ + logger.info("Merging run metric databases into a single stream.") + + if not metrics_path.is_dir(): + raise FileNotFoundError(f"Metrics directory {metrics_path} does not exist.") + + if not output_path.is_dir(): + raise FileNotFoundError(f"Output directory {output_path} does not exist.") + + if not os.access(output_path, os.W_OK | os.X_OK): + raise PermissionError(f"Output directory {output_path} is not writeable.") + + max_records_in_db = 100000 + + def create_db(time: datetime.datetime) -> tinyflux.TinyFlux: + time_str = time.strftime("%s") + db_path = output_path / f"metrics-{time_str}.csv" + return tinyflux.TinyFlux(db_path), db_path + + metric_files = list( + filter(lambda p: p.is_file() and p.suffix == ".csv", metrics_path.iterdir()) + ) + + # Merge the per-node metric file into a single ordered time-series database. Only if the number of records starts + # exceeding a certain threshold, create a new DB which will contain entries from a specific timestamp. + db: tinyflux.TinyFlux = None + db_path: pathlib.Path = None + + run_metrics_dbs: list[tinyflux.TinyFlux] = [ + iter(tinyflux.TinyFlux(path)) for path in metric_files + ] + with tarfile.open(output_path / METRICS_TAR_FILENAME, "w:gz") as tar: + it = heapq.merge(*run_metrics_dbs, key=lambda point: point.time) + for point in it: + if db is None: + db, db_path = create_db(point.time) + + if len(db) >= max_records_in_db: + # Store the now saturated database in the compressed tarball. + tar.add(db_path) + db_path.unlink() + + db, db_path = create_db(point.time) + + # TODO: Add node identifier to the point + db.insert(point) + + # There is always at least one database left. + tar.add(db_path) + db_path.unlink() + + +# id +# run_name +# ODOP config +# Metrics (merged and compressed) +# Runtime environment dump +class BenchmarkRecord: + def __init__(self, id: str): + self.id: str = id + + self._path: pathlib.Path = ODOP_BENCHMARKS_PATH / self.id + self._config_path: pathlib.Path = self._path / ODOP_CONF_FILENAME + self._info_path: pathlib.Path = self._path / RECORD_INFO_FILENAME + self._metrics_path: pathlib.Path = self._path / METRICS_TAR_FILENAME + + self._validate() + + with open(self._info_path) as f: + with json.load(f) as info: + self.run_name = info["run_name"] + + def _validate(self): + if not self._path.is_dir(): + raise FileNotFoundError + if not os.access(self._path, os.R_OK | os.W_OK | os.X_OK): + raise PermissionError + + if not self._config_path.is_file(): + raise FileNotFoundError + if not os.access(self._config_path, os.R_OK): + raise PermissionError + + if not self._info_path.is_file(): + raise FileNotFoundError + if not os.access(self._info_path, os.R_OK): + raise PermissionError + + @classmethod + def from_benchmark_record(cls, record_path: pathlib.Path) -> BenchmarkRecord: + id = record_path.name + if not _check_uuid(id): + raise ValueError(f"{id} is not a valid UUID.") + + # TODO: Add parsing of info file + + return cls(id) + + @classmethod + def from_run(cls, run_path: pathlib.Path) -> BenchmarkRecord: + """ + + Returns: + + Raises: + """ + run_name: str = run_path.name + + if not run_path.is_dir(): + logger.errorf("Run directory {run_path} does not exist.") + raise FileNotFoundError + + logger.info(f"Creating a new benchmark record from run {run_name}.") + + # Create an identifier and prepare a directory for the benchmark record data. + # In case the identifier is not unique, try again with a new one. + while True: + id = str(uuid.uuid4()) + + record_path: pathlib.Path = ODOP_BENCHMARKS_PATH / id + try: + record_path.mkdir(0o755, True, False) + except FileExistsError: + continue + + break + + # Populate the info file of the record. + info = { + "run_name": run_name, + } + + record_info_path: pathlib.Path = record_path / RECORD_INFO_FILENAME + with open(record_info_path, "w+") as f: + json.dump(info, f) + + # Archive the odop_conf.yaml used in the run. + odop_conf_src_path: pathlib.Path = run_path / ODOP_CONF_FILENAME + if odop_conf_src_path.is_file(): + odop_conf_dst_path: pathlib.Path = record_path / ODOP_CONF_FILENAME + shutil.copy2(odop_conf_src_path, odop_conf_dst_path) + else: + logger.warning( + f"The {run_path} directory does not contain the {ODOP_CONF_FILENAME} used in the run. It cannot be included with the benchmark." + ) + + # TODO: How to capture the environment information? Should this already be marked down by the OdopObs since it + # tracks it? + + # Find and merge the node-specific metrics into a single time-ordered stream. + metrics_path: pathlib.Path = run_path / "metric_database" + try: + _process_run_metrics(metrics_path, record_path) + except: + raise + + return cls(id) + + +def get_benchmark_records() -> list[BenchmarkRecord]: + records: list[BenchmarkRecord] = [] + + logger.info(f"Collecting benchmark records from {ODOP_BENCHMARKS_PATH}.") + + if not ODOP_BENCHMARKS_PATH.is_dir(): + logger.info("The benchmarks directory does not exist.") + return records + + for path in ODOP_BENCHMARKS_PATH.iterdir(): + if not path.is_dir(): + logger.debug(f"{path} is not a directory. Ignoring it.") + continue + + record: BenchmarkRecord = BenchmarkRecord.from_benchmark_record(path) + records.append(record) + + return records diff --git a/odop/cli/__main__.py b/odop/cli/__main__.py index 84d47be..74cf294 100644 --- a/odop/cli/__main__.py +++ b/odop/cli/__main__.py @@ -1,16 +1,18 @@ import os +import pathlib +import sys import click import requests import odop -from odop.common import ODOP_PATH +from odop.benchmark import BenchmarkRecord, get_benchmark_records +from odop.common import ODOP_RUNS_PATH, get_runs from odop.ui import Status def get_status(run_name): - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) status_file = os.path.join(run_folder, "status") status = Status(status_file) return status @@ -23,7 +25,7 @@ def odop_cli(): pass -@click.command() +@odop_cli.command() @click.argument("run_name", nargs=1) @click.argument( "task_folder", @@ -35,15 +37,14 @@ def scan_tasks_folder(run_name, task_folder): """Loads tasks from the task folder path and all subpaths.""" print("Task folder:", task_folder) - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) executables_folder = os.path.join(run_folder, "executables") task_parameters_folder = os.path.join(run_folder, "task_parameters") odop.scan_tasks_folder(task_folder, task_parameters_folder, executables_folder) -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) @click.argument("task_name", type=str) def remove_task(run_name, task_name): @@ -58,7 +59,27 @@ def remove_task(run_name, task_name): print(f"Task {task_name} not found in run {run_name}") -@click.command() +@odop_cli.command() +def list_runs(): + """List all known runs and their statuts""" + runs: list[pathlib.Path] = get_runs() + + run_name_len = 0 + for run in runs: + if len(run.name) <= run_name_len: + continue + run_name_len = len(run.name) + run_name_len = min(run_name_len, 20) + + print(f"{'': <2}\t{'RUN NAME': <{run_name_len}}\t{'STATUS': <8}") + for idx, run in enumerate(runs): + run_name = run.name + status: Status = get_status(run_name) + + print(f"{idx: <2}\t{run_name: <{run_name_len}}\t{status['runtime_status']: <8}") + + +@odop_cli.command() @click.argument("run_name", type=str) def list_tasks(run_name): """List task names in given run""" @@ -70,14 +91,13 @@ def list_tasks(run_name): print(task_file.replace(".json", "")) -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def visualize_folder(run_name): """Visualize cpu utilization of all core""" from .visualization.visualize_all_folder import process_folder - runs_base_path = os.path.join(ODOP_PATH, "runs") - run_folder = os.path.join(runs_base_path, run_name) + run_folder = os.path.join(ODOP_RUNS_PATH, run_name) data_folder = os.path.join(run_folder, "metric_database") process_folder(data_folder) @@ -96,7 +116,7 @@ def request_api(run_name, endpoint): return {} -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def queue_summary(run_name): """Return a summary of the task queue for a given run.""" @@ -106,7 +126,7 @@ def queue_summary(run_name): print(f"{status}: {summaries[status]}") -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) def queue_status(run_name): """Return the status of tasks in a given run.""" @@ -119,7 +139,7 @@ def queue_status(run_name): print() -@click.command() +@odop_cli.command() @click.argument("run_name", type=str) @click.argument("task_id", type=int) def queue_detail(run_name, task_id): @@ -128,7 +148,7 @@ def queue_detail(run_name, task_id): print("Task info:", task_info) -@click.command() +@odop_cli.command() @click.argument( "task_folder", required=False, @@ -142,14 +162,56 @@ def check_tasks(task_folder): odop.scan_tasks_folder(task_folder, write=False) -odop_cli.add_command(list_tasks) -odop_cli.add_command(scan_tasks_folder) -odop_cli.add_command(remove_task) -odop_cli.add_command(visualize_folder) -odop_cli.add_command(list_tasks) -odop_cli.add_command(queue_summary) -odop_cli.add_command(queue_status) -odop_cli.add_command(queue_detail) -odop_cli.add_command(check_tasks) +@odop_cli.group() +def benchmark(): + pass + + +@benchmark.command("list") +def benchmark_list(): + """List known benchmark records.""" + records: list[BenchmarkRecord] = get_benchmark_records() + + run_name_len = 0 + for record in records: + if len(record.run_name) <= run_name_len: + continue + run_name_len = len(record.run_name) + run_name_len = min(run_name_len, 20) + + print(f"{'ID': <36}\t{'RUN NAME': <{run_name_len}}") + for record in records: + print(f"{record.id: <36}\t{record.run_name: <{run_name_len}}") + + +@benchmark.command("save") +@click.argument("run_id", type=click.IntRange(0), required=True) +def benchmark_save(run_id: int): + """Convert an ODOP run into a benchmark record.""" + # TODO: Add support for specifying using run name. Will require a prompt in case there are multiple runs with the + # same name. + runs: list[pathlib.Path] = get_runs() + if len(runs) == 0: + print("There are no runs to save. Aborting.") + + if run_id > len(runs) - 1: + raise click.BadArgumentUsage( + f"Value {run_id} of 'run_id' is not valid. Use values from within the range <0, {len(runs) - 1}>." + ) + + selected_run: pathlib.Path = runs[run_id] + try: + record: BenchmarkRecord = BenchmarkRecord.from_run(selected_run) + except FileNotFoundError: + print( + f"Unexpected error: The run directory {selected_run} does not exist.", + file=sys.stderr, + ) + + print( + f"Created a new benchmark record of run {record.run_name} with ID {record.id}." + ) + + if __name__ == "__main__": odop_cli() diff --git a/odop/common.py b/odop/common.py index 9146b19..1ab106a 100644 --- a/odop/common.py +++ b/odop/common.py @@ -17,6 +17,8 @@ def create_logger(name, level=logging.INFO): logger = create_logger("odop") +ODOP_CONF_FILENAME = "odop_conf.yaml" + try: ODOP_PATH = os.getenv("ODOP_PATH") except KeyError: @@ -26,7 +28,31 @@ def create_logger(name, level=logging.INFO): if ODOP_PATH is None: user_home = Path.home() # assume that we have odop dir - ODOP_PATH = os.path.join(user_home, ".odop") + ODOP_PATH = user_home / ".odop" +else: + ODOP_PATH = Path(ODOP_PATH) + +if not ODOP_PATH.is_dir(): os.makedirs(ODOP_PATH, exist_ok=True) RUN_ID = os.getenv("RUN_ID") + +ODOP_RUNS_PATH = ODOP_PATH / "runs" + +def get_runs() -> list[Path]: + runs = [] + + logger.info(f"Collecting runs from {ODOP_RUNS_PATH}.") + + if not ODOP_RUNS_PATH.is_dir(): + logger.info("The runs directory does not exist.") + return runs + + for path in ODOP_RUNS_PATH.iterdir(): + if not path.is_dir(): + logger.debug(f"{path} is not a directory. Ignoring it.") + continue + + runs.append(path) + + return sorted(runs) diff --git a/odop/ui/ui.py b/odop/ui/ui.py index 38589a8..5da79cf 100644 --- a/odop/ui/ui.py +++ b/odop/ui/ui.py @@ -8,7 +8,7 @@ import odop import odop.scheduler as scheduler -from odop.common import ODOP_PATH, create_logger +from odop.common import ODOP_PATH, ODOP_RUNS_PATH, create_logger from odop.odop_obs import OdopObs from odop.ui import Status, read_config @@ -188,7 +188,7 @@ def start( ) run_name = runtime_config["run_name"] - self.run_folder = os.path.join(ODOP_PATH, "runs", run_name) + self.run_folder = os.path.join(ODOP_RUNS_PATH, run_name) os.makedirs(self.run_folder, exist_ok=True) self.status = Status(os.path.join(self.run_folder, "status")) self.status.reset() diff --git a/odop/utils/dashboard.py b/odop/utils/dashboard.py index c4474ca..ae7ab98 100644 --- a/odop/utils/dashboard.py +++ b/odop/utils/dashboard.py @@ -8,7 +8,7 @@ import streamlit as st from odop.cli.visualization.common import extract_data_from_file_path -from odop.common import ODOP_PATH +from odop.common import ODOP_RUNS_PATH @st.cache_data() @@ -59,12 +59,11 @@ def plot_monitoring_data_with_slider(timestamps, cpu_values, allowed_cpu_values) st.altair_chart(chart, use_container_width=True) -base_runs_path = os.path.join(ODOP_PATH, "runs") -chosen_run = st.selectbox("Which run you want to see", os.listdir(base_runs_path)) +chosen_run = st.selectbox("Which run you want to see", os.listdir(ODOP_RUNS_PATH)) node_to_choose = [ Path(filepath).stem for filepath in glob.glob( - os.path.join(base_runs_path, chosen_run, "metric_database/*.csv") + os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv") ) ] chosen_node = st.selectbox( @@ -74,7 +73,7 @@ def plot_monitoring_data_with_slider(timestamps, cpu_values, allowed_cpu_values) timestamps, cpu_values, allowed_cpu_values = load_data( os.path.join( - base_runs_path, + ODOP_RUNS_PATH, chosen_run, "metric_database", f"{chosen_node}.csv", diff --git a/odop/utils/pages/current_optask.py b/odop/utils/pages/current_optask.py index 34dba2b..df176be 100644 --- a/odop/utils/pages/current_optask.py +++ b/odop/utils/pages/current_optask.py @@ -5,7 +5,7 @@ import streamlit as st import yaml -from odop.common import ODOP_PATH +from odop.common import ODOP_RUNS_PATH def query_data(hostname: str) -> list[dict]: @@ -28,9 +28,7 @@ def get_config_data(run_path: str): return None -base_runs_path = os.path.join(ODOP_PATH, "runs") - -run_paths = os.listdir(base_runs_path) +run_paths = os.listdir(ODOP_RUNS_PATH) run_paths = filter_run_dir(run_paths) run_paths.sort() diff --git a/odop/utils/pages/real_time_monitoring.py b/odop/utils/pages/real_time_monitoring.py index bd10b82..b7538f4 100644 --- a/odop/utils/pages/real_time_monitoring.py +++ b/odop/utils/pages/real_time_monitoring.py @@ -7,7 +7,7 @@ import streamlit as st import yaml -from odop.common import ODOP_PATH, create_logger +from odop.common import ODOP_RUNS_PATH, create_logger logger = create_logger("monitoring") @@ -32,21 +32,19 @@ def get_config_data(run_path: str): return None -base_runs_path = os.path.join(ODOP_PATH, "runs") - -run_paths = os.listdir(base_runs_path) +run_paths = os.listdir(ODOP_RUNS_PATH) run_paths = filter_run_dir(run_paths) run_paths.sort() chosen_run = st.selectbox("Which run you want to monitor", run_paths) node_names = list( - glob.glob(os.path.join(base_runs_path, chosen_run, "metric_database/*.csv")) + glob.glob(os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv")) ) node_to_choose = [ Path(filepath).stem for filepath in glob.glob( - os.path.join(base_runs_path, chosen_run, "metric_database/*.csv") + os.path.join(ODOP_RUNS_PATH, chosen_run, "metric_database/*.csv") ) ] node_to_choose.sort() diff --git a/pyproject.toml b/pyproject.toml index 891c25f..40f29b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pydantic-settings", "loguru>=0.7.2", "streamlit>=1.39.0", + "tinyflux", ] [project.optional-dependencies]