diff --git a/.gitignore b/.gitignore index d3ae9cd..6eaf67c 100644 --- a/.gitignore +++ b/.gitignore @@ -282,3 +282,5 @@ go.work.sum # Meta-internal CI skycastle/ scrut/ + +*.err \ No newline at end of file diff --git a/README.md b/README.md index 29769f7..741d1a0 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,8 @@ Each component has its own README with detailed guides: ## Possible Expansions -- Integration with more GPU types (AMD, Intel, Custom Accelerators) +- **AMD/ROCm GPU support (implemented)**: Use `gcm rocm_monitor` for AMD GPU metrics and `health_checks check_amd_smi` for AMD GPU health checks. Requires [amd-smi](https://rocm.docs.amd.com/projects/amdsmi/) or rocm-smi on PATH; device list in prolog/epilog uses `SLURM_JOB_GPUS`, `CUDA_VISIBLE_DEVICES` (NVIDIA), or `ROCR_VISIBLE_DEVICES` (AMD). +- Integration with more GPU types (Intel, Custom Accelerators) - Support for additional schedulers beyond Slurm - [Additional Slurm related Monitoring](gcm/docs/adding_new_collector.md) - [Support for new exporters](gcm/docs/adding_new_exporter.md) diff --git a/gcm/bin/generate_features.py b/gcm/bin/generate_features.py index e8c4122..d777411 100644 --- a/gcm/bin/generate_features.py +++ b/gcm/bin/generate_features.py @@ -51,7 +51,9 @@ def load_config(self) -> Dict[str, Any]: f"Error reading toml file. {{{class_name}.config_path}} does not contain valid TOML. Error: {{e}}" ) raise tomli.TOMLDecodeError( - f"{{{class_name}.config_path}} does not contain valid TOML.", + msg=f"{{{class_name}.config_path}} does not contain valid TOML. {{e.msg}}", + doc=e.doc, + pos=e.pos, ) from e else: raise ValueError( diff --git a/gcm/health_checks/README.md b/gcm/health_checks/README.md index aa452f4..05058ea 100644 --- a/gcm/health_checks/README.md +++ b/gcm/health_checks/README.md @@ -69,6 +69,7 @@ $ health_checks --features-config=$features_path --config=$config_path check-dcg - [check-syslogs](#check-syslogs) - [cuda-memtest](#cuda-memtest) - [check-nccl](#check-nccl) +- [check-rccl](#check-rccl) - [check-hca](#check-hca) - [check-storage](#check-storage) - [check-ipmitool](#check-ipmitool) @@ -211,6 +212,28 @@ $ health_checks check-nccl fair_cluster prolog -p all_reduce --pairwise --hostli $ health_checks check-nccl fair_cluster prolog -p all_reduce --pairwise-quick --hostlist=$SLURM_JOB_NODELIST --nccl-tdir /shared/home/abinesh/nccl-tests/build/ --critical-threshold 100 --sink=do_nothing ``` +# check-rccl
+Run RCCL (ROCm Communication Collectives Library) tests on AMD GPU nodes. Analogous to check-nccl for NVIDIA nodes. +1. Run single node RCCL tests (e.g. from ROCm/rccl-tests build) +2. Run pairwise RCCL tests + +File: `gcm/health_checks/checks/check_rccl.py` + +Example of execution: +```shell +# For a list of the available options +$ health_checks check-rccl --help + +# Single node all_reduce_perf RCCL test +$ health_checks check-rccl fair_cluster prolog -p all_reduce --rccl-tdir /opt/rccl-tests/build/ --critical-threshold 18 --sink=do_nothing + +# Pairwise all_reduce_perf RCCL test (hostlist required) +$ health_checks check-rccl fair_cluster prolog -p all_reduce --pairwise --hostlist=node-[1-4] --rccl-tdir /opt/rccl-tests/build/ --critical-threshold 100 --sink=do_nothing + +# Quick pairwise - each node covered once. SLURM_JOB_NODELIST can be used when running inside SLURM. +$ health_checks check-rccl fair_cluster prolog -p all_reduce --pairwise-quick --hostlist=$SLURM_JOB_NODELIST --rccl-tdir /opt/rccl-tests/build/ --critical-threshold 100 --sink=do_nothing +``` + # check-hca
Check if HCAs are present and count matches the expectation. diff --git a/gcm/health_checks/checks/__init__.py b/gcm/health_checks/checks/__init__.py index f72ff8d..f655a25 100644 --- a/gcm/health_checks/checks/__init__.py +++ b/gcm/health_checks/checks/__init__.py @@ -1,6 +1,7 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. from gcm.health_checks.checks.check_airstore import check_airstore +from gcm.health_checks.checks.check_amd_smi import check_amd_smi from gcm.health_checks.checks.check_authentication import check_authentication from gcm.health_checks.checks.check_blockdev import check_blockdev from gcm.health_checks.checks.check_dcgmi import check_dcgmi @@ -9,6 +10,7 @@ from gcm.health_checks.checks.check_ibstat import check_ib from gcm.health_checks.checks.check_ipmitool import check_ipmitool from gcm.health_checks.checks.check_nccl import check_nccl +from gcm.health_checks.checks.check_rccl import check_rccl from gcm.health_checks.checks.check_node import check_node from gcm.health_checks.checks.check_nvidia_smi import check_nvidia_smi from gcm.health_checks.checks.check_pci import check_pci @@ -25,11 +27,13 @@ __all__ = [ "check_ssh_certs", "check_airstore", + "check_amd_smi", "check_telemetry", "check_dcgmi", "check_hca", "check_nvidia_smi", "check_nccl", + "check_rccl", "check_syslogs", "check_process", "cuda", diff --git a/gcm/health_checks/checks/check_amd_smi.py b/gcm/health_checks/checks/check_amd_smi.py new file mode 100644 index 0000000..4ae2147 --- /dev/null +++ b/gcm/health_checks/checks/check_amd_smi.py @@ -0,0 +1,518 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""AMD SMI / ROCm GPU health checks using amd-smi or rocm-smi.""" + +import logging +import os +import socket +import sys +import time +from contextlib import ExitStack +from dataclasses import dataclass +from typing import ( + Any, + Collection, + List, + Literal, + Optional, + Protocol, + Tuple, +) + +import click + +import gni_lib +import psutil +from gcm.health_checks.check_utils.output_context_manager import OutputContext +from gcm.health_checks.check_utils.telem import TelemetryContext +from gcm.health_checks.click import common_arguments, telemetry_argument +from gcm.health_checks.device_telemetry_exception_handling import ( + handle_device_telemetry_exception, +) +from gcm.health_checks.device_telemetry_utils import get_gpu_devices +from gcm.health_checks.env_variables import EnvCtx +from gcm.health_checks.measurement_units import convert_bytes +from gcm.health_checks.types import CHECK_TYPE, CheckEnv, ExitCode +from gcm.monitoring.click import heterogeneous_cluster_v1_option + +from gcm.monitoring.device_telemetry_client import ( + DeviceTelemetryClient, + DeviceTelemetryException, +) +from gcm.monitoring.device_telemetry_rocm import ROCmDeviceTelemetryClient +from gcm.monitoring.features.gen.generated_features_healthchecksfeatures import ( + FeatureValueHealthChecksFeatures, +) +from gcm.monitoring.slurm.derived_cluster import get_derived_cluster +from gcm.monitoring.utils.monitor import init_logger +from gcm.schemas.gpu.process import ProcessInfo +from gcm.schemas.health_check.health_check_name import HealthCheckName +from typeguard import typechecked + +from gcm.health_checks.checks.check_nvidia_smi import ( + attempt_check_running_procs, + kill_processes, +) + + +class AmdSmiCli(CheckEnv, Protocol): + def get_device_telemetry(self) -> DeviceTelemetryClient: ... + + +@dataclass +class AmdSmiCliImpl: + cluster: str + type: str + log_level: str + log_folder: str + + def get_device_telemetry(self) -> DeviceTelemetryClient: + return ROCmDeviceTelemetryClient() + + +def _check_gpu_num( + device_telemetry: DeviceTelemetryClient, + expected_gpus: int, + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_gpu_num(): + msg = f"{HealthCheckName.AMD_SMI_GPU_NUM.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + try: + present_gpus = device_telemetry.get_device_count() + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + if present_gpus != expected_gpus: + msg = f"gpu_num check: exit_code: {ExitCode.CRITICAL}, Number of GPUs present, {present_gpus}, is different than expected, {expected_gpus}\n" + return ExitCode.CRITICAL, msg + msg = f"gpu_num check: exit_code: {ExitCode.OK}, Number of GPUs present is the same as expected, {expected_gpus}\n" + return ExitCode.OK, msg + + +def _check_running_procs( + device_telemetry: DeviceTelemetryClient, + type: CHECK_TYPE, + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_running_procs(): + msg = f"{HealthCheckName.AMD_SMI_RUNNING_PROCS.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + try: + devices = get_gpu_devices(device_telemetry, type) + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + if not devices: + return ExitCode.OK, "running_procs check: No GPU devices were found." + msg = "" + with EnvCtx({"ROCR_VISIBLE_DEVICES": None}): + _, exit_code, msg = attempt_check_running_procs( + 0, devices, msg, device_telemetry + ) + if exit_code == ExitCode.OK: + msg = f"running_procs check: No other process is occupying any of the following GPUs: {devices}.\n" + return exit_code, msg + + +def _check_and_kill_running_procs( + device_telemetry: DeviceTelemetryClient, + type: CHECK_TYPE, + retry_count: int, + retry_interval: int, + force_kill_process: bool, + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_running_procs_and_kill(): + msg = f"{HealthCheckName.AMD_SMI_RUNNING_PROCS.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + try: + devices = get_gpu_devices(device_telemetry, type) + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + if not devices: + return ExitCode.OK, "running_procs check: No GPU devices were found." + exit_code = ExitCode.OK + msg = "" + pids: List[ProcessInfo] = [] + with EnvCtx({"ROCR_VISIBLE_DEVICES": None}): + for attempt in range(retry_count): + (pids, attempt_exit_code, msg) = attempt_check_running_procs( + attempt, devices, msg, device_telemetry + ) + if attempt_exit_code == ExitCode.OK: + exit_code = ExitCode.OK if attempt == 0 else ExitCode.WARN + break + elif attempt == retry_count - 1: + exit_code = attempt_exit_code + else: + time.sleep(retry_interval) + if force_kill_process and pids: + proc_pids = [p_id.pid for p_id in pids] + msg += f"running_procs check: force killed pids: {proc_pids}\n" + (is_killed, msg) = kill_processes( + proc_pids, retry_count, devices, msg, device_telemetry + ) + if is_killed: + msg += f"running_procs check: pids are successfully killed: {proc_pids}\n" + exit_code = ExitCode.OK + else: + msg += f"running_procs check: pids are not killed: {proc_pids}\n" + exit_code = ExitCode.CRITICAL + if exit_code == ExitCode.OK: + msg += f"running_procs check: No other process is occupying any of the following GPUs: {devices}.\n" + return exit_code, msg + + +def _check_app_clock_freq( + device_telemetry: DeviceTelemetryClient, + gpu_app_freq: int, + gpu_app_mem_freq: int, + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_clock_freq(): + msg = f"{HealthCheckName.AMD_SMI_CLOCK_FREQ.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + exit_code = ExitCode.OK + msg = "" + try: + device_count = device_telemetry.get_device_count() + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + for device in range(device_count): + try: + handle = device_telemetry.get_device_by_index(device) + clock_info = handle.get_clock_freq() + except DeviceTelemetryException as e: + error_code, error_msg = handle_device_telemetry_exception(e) + if error_code > exit_code: + exit_code = error_code + msg += f"clock_freq check: GPU {device}: {error_msg}" + else: + if ( + clock_info.graphics_freq < gpu_app_freq + or clock_info.memory_freq < gpu_app_mem_freq + ): + msg += f"clock_freq check: exit_code: {ExitCode.CRITICAL}, GPU {device} has less application freq than expected. Expected: (GPU, GPU_mem) {gpu_app_freq}, {gpu_app_mem_freq} and got {clock_info.graphics_freq}, {clock_info.memory_freq}.\n" + exit_code = ExitCode.CRITICAL + if exit_code == ExitCode.OK: + msg = f"clock_freq check: exit_code: {ExitCode.OK}, Application frequencies are as expected.\n" + return exit_code, msg + + +def _check_gpu_temp( + device_telemetry: DeviceTelemetryClient, + gpu_temperature_threshold: Optional[int], + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_gpu_temp(): + msg = f"{HealthCheckName.AMD_SMI_GPU_TEMP.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + if gpu_temperature_threshold is None: + return ( + ExitCode.CRITICAL, + "gpu_temperature_threshold should not be None", + ) + exit_code = ExitCode.OK + msg = "" + try: + device_count = device_telemetry.get_device_count() + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + for device in range(device_count): + try: + handle = device_telemetry.get_device_by_index(device) + gpu_temperature = handle.get_temperature() + except DeviceTelemetryException as e: + error_code, error_msg = handle_device_telemetry_exception(e) + if error_code > exit_code: + exit_code = error_code + msg += f"gpu_temp check: GPU {device}: {error_msg}" + else: + if gpu_temperature > gpu_temperature_threshold: + exit_code = ExitCode.CRITICAL + msg += f"gpu_temp check: exit_code: {ExitCode.CRITICAL}, GPU {device} has temperature: {gpu_temperature}, higher than critical threshold of {gpu_temperature_threshold}.\n" + if exit_code == ExitCode.OK: + msg = f"gpu_temp check: exit_code: {ExitCode.OK}, all GPU temperatures are lower than max threshold, {gpu_temperature_threshold}.\n" + return exit_code, msg + + +def _check_mem_usage( + device_telemetry: DeviceTelemetryClient, + type: CHECK_TYPE, + gpu_mem_usage_threshold: int, + logger: logging.Logger, +) -> Tuple[ExitCode, str]: + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi_mem_usage(): + msg = f"{HealthCheckName.AMD_SMI_MEM_USAGE.value} is disabled by killswitch." + logger.info(msg) + return ExitCode.OK, msg + try: + devices = get_gpu_devices(device_telemetry, type) + except DeviceTelemetryException as e: + return handle_device_telemetry_exception(e) + if not devices: + return ExitCode.OK, "mem_usage check: No GPU devices were found." + exit_code = ExitCode.OK + msg = "" + with EnvCtx({"ROCR_VISIBLE_DEVICES": None}): + for device in devices: + try: + handle = device_telemetry.get_device_by_index(device) + memory_info = handle.get_memory_info() + except DeviceTelemetryException as e: + error_code, error_msg = handle_device_telemetry_exception(e) + if error_code > exit_code: + exit_code = error_code + msg += f"mem_usage check: GPU {device}: {error_msg}" + else: + if convert_bytes(memory_info.used, "MiB") > gpu_mem_usage_threshold: + msg += f"mem_usage check: GPU {device} mem usage: {convert_bytes(memory_info.used, 'MiB')} is higher than threshold: {gpu_mem_usage_threshold}.\n" + exit_code = ExitCode.CRITICAL + if exit_code == ExitCode.OK: + msg = f"mem_usage check: all GPUs have mem usage lower than threshold: {gpu_mem_usage_threshold}.\n" + return exit_code, msg + + +class TemperatureRequiredOption(click.Option): + def process_value(self, ctx: click.Context, value: Any) -> Any: + value = super().process_value(ctx, value) + if value is None and "gpu_temperature" in ctx.params["check"]: + msg = "gpu_temperature_threshold is required for gpu_temperature check" + raise click.MissingParameter(ctx=ctx, param=self, message=msg) + return value + + +@click.command() +@common_arguments +@telemetry_argument +@heterogeneous_cluster_v1_option +@click.option( + "--check", + "-c", + type=click.Choice( + [ + "gpu_num", + "running_procs", + "running_procs_and_kill", + "clock_freq", + "gpu_temperature", + "gpu_mem_usage", + ], + ), + required=True, + multiple=True, + help="Select the checks to perform. Can select more than 1 of the options.", +) +@click.option("--gpu_num", type=click.INT, default=8) +@click.option( + "--gpu_app_freq", + type=click.INT, + default=800, + help="Select what the GPU application frequency should be (MHz).", +) +@click.option( + "--gpu_app_mem_freq", + type=click.INT, + default=800, + help="Select what the GPU memory application frequency should be (MHz).", +) +@click.option( + "--gpu_temperature_threshold", + type=click.INT, + cls=TemperatureRequiredOption, + help="Maximum GPU temperature threshold in Celsius. Required if gpu_temperature check is selected.", +) +@click.option( + "--gpu_mem_usage_threshold", + type=click.INT, + default=15, + help="Maximum GPU memory usage threshold MiB.", +) +@click.option( + "--running_procs_retry_count", + type=click.INT, + default=3, + help="Number of retries for running process checks.", +) +@click.option( + "--running_procs_interval", + type=click.INT, + default=3, + help="Wait between running process check retries in seconds.", +) +@click.option( + "--running_procs_force_kill", + type=click.BOOL, + default=False, + help="Whether the health check should force-kill the running process.", +) +@click.pass_obj +@typechecked +def check_amd_smi( + obj: Optional[AmdSmiCli], + cluster: str, + type: CHECK_TYPE, + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + log_folder: str, + sink: str, + sink_opts: Collection[str], + verbose_out: bool, + heterogeneous_cluster_v1: bool, + check: Tuple[str, ...], + gpu_num: int, + gpu_app_freq: int, + gpu_app_mem_freq: int, + gpu_temperature_threshold: Optional[int], + gpu_mem_usage_threshold: int, + running_procs_retry_count: int, + running_procs_interval: int, + running_procs_force_kill: bool, +) -> None: + """Perform AMD SMI / ROCm checks to assess the state of AMD GPUs.""" + node: str = socket.gethostname() + logger, _ = init_logger( + logger_name=type, + log_dir=os.path.join(log_folder, type + "_logs"), + log_name=node + ".log", + log_level=getattr(logging, log_level), + ) + logger.info( + f"check_amd_smi: check: {check} cluster: {cluster}, node: {node}, type: {type}" + ) + try: + gpu_node_id = gni_lib.get_gpu_node_id() + except Exception as e: + gpu_node_id = None + logger.warning(f"Could not get gpu_node_id, likely not a GPU host: {e}") + derived_cluster = get_derived_cluster( + cluster=cluster, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + data={"Node": node}, + ) + if obj is None: + obj = AmdSmiCliImpl(cluster, type, log_level, log_folder) + overall_exit_code = ExitCode.UNKNOWN + overall_msg = "" + try: + device_telemetry = obj.get_device_telemetry() + except DeviceTelemetryException as e: + with ExitStack() as s: + s.enter_context( + TelemetryContext( + sink=sink, + sink_opts=sink_opts, + logger=logger, + cluster=cluster, + derived_cluster=derived_cluster, + type=type, + name=HealthCheckName.AMD_SMI.value, + node=node, + get_exit_code_msg=lambda: (overall_exit_code, overall_msg), + gpu_node_id=gpu_node_id, + ) + ) + s.enter_context( + OutputContext( + type, + HealthCheckName.AMD_SMI, + lambda: (overall_exit_code, overall_msg), + verbose_out, + ) + ) + overall_exit_code, overall_msg = handle_device_telemetry_exception(e) + logger.info( + f"Exception during ROCm telemetry init. exit_code: {overall_exit_code} msg: {overall_msg}" + ) + sys.exit(overall_exit_code.value) + amd_check = [ + ( + "gpu_num", + HealthCheckName.AMD_SMI_GPU_NUM, + lambda: _check_gpu_num(device_telemetry, gpu_num, logger), + ), + ( + "clock_freq", + HealthCheckName.AMD_SMI_CLOCK_FREQ, + lambda: _check_app_clock_freq( + device_telemetry, gpu_app_freq, gpu_app_mem_freq, logger + ), + ), + ( + "running_procs", + HealthCheckName.AMD_SMI_RUNNING_PROCS, + lambda: _check_running_procs(device_telemetry, type, logger), + ), + ( + "running_procs_and_kill", + HealthCheckName.AMD_SMI_RUNNING_PROCS_AND_KILL, + lambda: _check_and_kill_running_procs( + device_telemetry, + type, + running_procs_retry_count, + running_procs_interval, + running_procs_force_kill, + logger, + ), + ), + ( + "gpu_temperature", + HealthCheckName.AMD_SMI_GPU_TEMP, + lambda: _check_gpu_temp( + device_telemetry, gpu_temperature_threshold, logger + ), + ), + ( + "gpu_mem_usage", + HealthCheckName.AMD_SMI_MEM_USAGE, + lambda: _check_mem_usage( + device_telemetry, type, gpu_mem_usage_threshold, logger + ), + ), + ] + with OutputContext( + type, + HealthCheckName.AMD_SMI, + lambda: (overall_exit_code, overall_msg), + verbose_out, + ): + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_amd_smi(): + overall_exit_code = ExitCode.OK + overall_msg = ( + f"{HealthCheckName.AMD_SMI.value} is disabled by killswitch." + ) + logger.info(overall_msg) + sys.exit(overall_exit_code.value) + for check_id, check_name, run_check in amd_check: + if check_id not in check: + continue + exit_code = ExitCode.UNKNOWN + msg = "" + with TelemetryContext( + sink=sink, + sink_opts=sink_opts, + logger=logger, + cluster=cluster, + derived_cluster=derived_cluster, + type=type, + name=check_name.value, + node=node, + get_exit_code_msg=lambda: (exit_code, msg), + gpu_node_id=gpu_node_id, + ): + exit_code, msg = run_check() + overall_msg += msg + if exit_code > overall_exit_code: + overall_exit_code = exit_code + logger.info(f"Overall exit code {overall_exit_code}\n{overall_msg}") + sys.exit(overall_exit_code.value) diff --git a/gcm/health_checks/checks/check_rccl.py b/gcm/health_checks/checks/check_rccl.py new file mode 100644 index 0000000..c9e1f18 --- /dev/null +++ b/gcm/health_checks/checks/check_rccl.py @@ -0,0 +1,447 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""RCCL (ROCm Communication Collectives Library) health check for AMD GPU nodes.""" + +import itertools +import logging +import os +import re +import socket +import sys +from contextlib import ExitStack +from dataclasses import dataclass +from typing import ( + Any, + Callable, + Collection, + get_args, + List, + Literal, + NoReturn, + Optional, + Tuple, +) + +import click + +import gni_lib +from gcm.health_checks.check_utils.output_context_manager import OutputContext +from gcm.health_checks.check_utils.telem import TelemetryContext +from gcm.health_checks.click import ( + common_arguments, + telemetry_argument, + timeout_argument, +) +from gcm.health_checks.subprocess import ( + handle_subprocess_exception, + shell_command, + ShellCommandOut, +) +from gcm.health_checks.types import CHECK_TYPE, ExitCode, LOG_LEVEL +from gcm.monitoring.click import heterogeneous_cluster_v1_option + +from gcm.monitoring.features.gen.generated_features_healthchecksfeatures import ( + FeatureValueHealthChecksFeatures, +) +from gcm.monitoring.slurm.derived_cluster import get_derived_cluster +from gcm.monitoring.slurm.nodelist_parsers import nodelist +from gcm.monitoring.utils.monitor import init_logger +from gcm.schemas.health_check.health_check_name import HealthCheckName + +FnShellCommand = Callable[[str, int], ShellCommandOut] + +Flavor = Literal["single", "pairwise", "pairwise-quick"] +RCCL_OPERATION = Literal["all_gather", "all_reduce", "alltoall"] + + +class PairwiseRequiredOptionRCCL(click.Option): + """Require hostlist when flavor is pairwise or pairwise-quick.""" + + def process_value(self, ctx: click.Context, value: Any) -> Any: + value = super(PairwiseRequiredOptionRCCL, self).process_value(ctx, value) + + if value is None and ctx.params["flavor"] != "single": + msg = "Host list required for pairwise RCCL testing" + raise click.MissingParameter(ctx=ctx, param=self, message=msg) + + return value + + +@dataclass +class RCCLTestProcessedOutput: + message: str + exitcode: ExitCode + stdout: Optional[str] + innererrorcode: Optional[int] + + +def get_hosts( + flavor: Flavor, + hostlist: Optional[str], + logger: logging.Logger, + timeout: Optional[int] = None, +) -> List[Tuple[str, ...]]: + hosts: List[Tuple[str, ...]] + + hostnames = [socket.gethostname()] + + if hostlist is not None: + hostlist_parser = nodelist() + parsed, unparsed = hostlist_parser(hostlist) + + if parsed is None: + logger.info(f'Invalid hostlist: "{unparsed}"') + raise click.BadParameter( + message=f'Invalid hostlist: "{unparsed}"', + param_hint="--hostlist", + ) + + logger.info(f'Parsed hostlist: "{parsed}"') + hostnames = parsed + + if flavor == "single": + return [(host,) for host in hostnames] + + if len(hostnames) < 2: + raise click.BadParameter( + message="Need to specify at least two hosts in the hostlists when used with any of the pairwise options", + param_hint="--hostlist", + ) + + if flavor == "pairwise": + return list(itertools.combinations(hostnames, 2)) + + if flavor == "pairwise-quick": + hosts = list(zip(hostnames[::2], hostnames[1::2])) + if len(hostnames) % 2: + hosts += [(hostnames[-1], hostnames[0])] + return hosts + + def assert_never(value: NoReturn) -> NoReturn: + raise TypeError(f"Unhandled value : {value}") + + assert_never(flavor) + + +def get_avg_bus_bw(output: ShellCommandOut) -> Optional[float]: + """Parse avg bus bandwidth (GB/s) from RCCL test stdout (same format as nccl-tests).""" + if output.returncode > 0: + return None + # RCCL rccl-tests use same "Avg bus bandwidth" line as nccl-tests + text = output.stdout or "" + if "Avg bus bandwidth" not in text and "Bus bandwidth" not in text: + return None + for line in text.split("\n"): + if "Avg bus bandwidth" in line or "Bus bandwidth" in line: + match = re.search(r"[-+]?(\d*\.*\d+)", line) + if match: + return float(match.group()) + return None + + +def process_rccl_test_output( + output: ShellCommandOut, + op: RCCL_OPERATION, + critical_threshold: float, + warn_threshold: Optional[float], +) -> RCCLTestProcessedOutput: + processed_output = RCCLTestProcessedOutput( + f"RCCL Test - {op} - FAILED to run.", + ExitCode.CRITICAL, + output.stdout, + output.returncode, + ) + if output.returncode > 0: + processed_output.exitcode = ExitCode.WARN + return processed_output + + avg_bus_bw = get_avg_bus_bw(output) + + if avg_bus_bw is None: + processed_output.exitcode = ExitCode.WARN + return processed_output + + if avg_bus_bw < critical_threshold: + processed_output.message = ( + f"RCCL Test - {op} - ran successfully. " + "But bus bandwidth value lower than critical threshold." + ) + processed_output.exitcode = ExitCode.CRITICAL + return processed_output + + if warn_threshold is not None and avg_bus_bw < warn_threshold: + processed_output.message = ( + f"RCCL Test - {op} - ran successfully. " + "But bus bandwidth value lower than warning threshold." + ) + processed_output.exitcode = ExitCode.WARN + return processed_output + + processed_output.message = f"RCCL Test - {op} - ran successfully" + processed_output.exitcode = ExitCode.OK + return processed_output + + +@click.command() +@common_arguments +@timeout_argument +@telemetry_argument +@heterogeneous_cluster_v1_option +@click.option( + "--single", + "flavor", + flag_value="single", + default=True, + help="Use for single node RCCL testing", + show_default=True, +) +@click.option( + "--pairwise", + "--pairwise-exhaustive", + "flavor", + flag_value="pairwise", + help="Use for pairwise RCCL testing for all possible pairs in the hostlist. " + "If hostlist is node[1-3], then this will run pairwise RCCL tests on pairs (node1, node2), " + "(node1, node3) and (node2, node3).", + show_default=True, +) +@click.option( + "--pairwise-quick", + "flavor", + flag_value="pairwise-quick", + help="Use for pairwise RCCL testing such that each node in the hostlist is covered just once. " + "If hostlist is node[1-3], then this will run pairwise RCCL tests on pairs (node1, node2) " + "and (node1, node3)", + show_default=True, +) +@click.option( + "--np", + "-n", + "copies", + type=int, + help="Run this many copies of the program on the given nodes. " + "By default this will be set to (num of nodes) * (num of gpus per node)", +) +@click.option( + "--gpus-per-node", + type=int, + default=8, + show_default=True, + help="Number of GPUs in each compute node. This will be used to set " + "the num of copies to run on the given node as (num of nodes) * (num of gpus per node)", +) +@click.option( + "--mpi-binpath", + type=click.Path(dir_okay=False), + help="Path to the mpirun binary.", +) +@click.option( + "--hostlist", + type=str, + cls=PairwiseRequiredOptionRCCL, + help="List of hosts to run the tests. " + "For --single option, hostlist is optional. " + "If specified, the test will be run on each of the node in this hostlist, else just on the local host.\n" + "For --pairwise option, the test will be run on each pair of nodes in this hostlist (required)", +) +@click.option( + "--mpi-opts", + type=str, + help="Options to pass to the underlying mpirun command. " + "Default includes: -mca coll_hcoll_enable 0 --bind-to numa. " + "See https://www.open-mpi.org/doc/current/man1/mpirun.1.php", + default="-mca coll_hcoll_enable 0 --bind-to numa", + show_default=True, +) +@click.option( + "exports", + "--export", + "-x", + type=str, + multiple=True, + help="Export the specified environment variables before executing the program. " + "Only one environment variable can be specified per -x option.", + default=[ + "HSA_FORCE_FINE_GRAIN_PCIE=1", + "GPU_DEVICE_ORDINAL=PCI_BUS_ID", + "NCCL_SOCKET_IFNAME=eth0", + "NCCL_DEBUG=WARN", + "NCCL_IB_PCI_RELAXED_ORDERING=1", + ], + show_default=True, +) +@click.option( + "--rccl-tdir", + type=click.Path(file_okay=False), + help="Path to the directory with the RCCL test binaries (e.g. from ROCm/rccl-tests build).", + required=True, +) +@click.option( + "--rccl-topts", + type=str, + help="RCCL test options. Same as nccl-tests: -g 1 -b 32M -e 1G -f 2 etc. " + "See https://github.com/ROCm/rccl-tests", + default="-g 1 -b 32M -e 1G -f 2", + show_default=True, +) +@click.option( + "--op", + "-p", + "operations", + type=click.Choice(get_args(RCCL_OPERATION)), + multiple=True, + help="RCCL collective operations to run. " + "Multiple operations can be specified, but only one operation per -p option", + required=True, +) +@click.option( + "--critical-threshold", + type=float, + help="Command exits with a critical exit code if avg bus bw value (in GB/s) is below this threshold", + required=True, +) +@click.option( + "--warn-threshold", + type=float, + help="Command exits with a warning exit code if avg bus bw value (in GB/s) is below this threshold", +) +@click.pass_obj +def check_rccl( + obj: Optional[FnShellCommand], + cluster: str, + type: CHECK_TYPE, + log_level: LOG_LEVEL, + log_folder: str, + timeout: int, + sink: str, + sink_opts: Collection[str], + verbose_out: bool, + heterogeneous_cluster_v1: bool, + flavor: Flavor, + copies: Optional[int], + gpus_per_node: int, + mpi_binpath: Optional[str], + hostlist: Optional[str], + mpi_opts: str, + exports: Tuple[str], + rccl_tdir: str, + rccl_topts: str, + operations: Tuple[RCCL_OPERATION], + critical_threshold: float, + warn_threshold: Optional[float], +) -> None: + """ + Run RCCL (ROCm Communication Collectives Library) tests to check both the + performance and the correctness of RCCL operations on AMD GPU nodes. + Analogous to check-nccl for NVIDIA nodes. + """ + node: str = socket.gethostname() + + logger, _ = init_logger( + logger_name=type, + log_dir=os.path.join(log_folder, type + "_logs"), + log_name=node + ".log", + log_level=getattr(logging, log_level), + ) + + logger.info(f"check_rccl: cluster: {cluster}, node: {node}, type: {type}") + try: + gpu_node_id = gni_lib.get_gpu_node_id() + except Exception as e: + gpu_node_id = None + logger.warning(f"Could not get gpu_node_id, likely not a GPU host: {e}") + + derived_cluster = get_derived_cluster( + cluster=cluster, + heterogeneous_cluster_v1=heterogeneous_cluster_v1, + data={"Node": node}, + ) + + mpirun_cmd = ["mpirun" if not mpi_binpath else mpi_binpath] + + hosts: List[Tuple[str, ...]] = get_hosts(flavor, hostlist, logger, timeout) + + cmd_args: List[str] = [ + "--np", + str( + gpus_per_node * (1 if flavor == "single" else 2) + if copies is None + else copies + ), + ] + + mpi_opts = mpi_opts.strip() + if mpi_opts: + cmd_args += [mpi_opts] + + if exports: + for export in exports: + cmd_args += ["-x", export] + + runner = obj + if runner is None: + runner = shell_command + + outputs: List[RCCLTestProcessedOutput] = [] + exit_code = ExitCode.UNKNOWN + msg = "" + with ExitStack() as s: + s.enter_context( + TelemetryContext( + sink=sink, + sink_opts=sink_opts, + logger=logger, + cluster=cluster, + derived_cluster=derived_cluster, + type=type, + name=HealthCheckName.RCCL_TESTS.value, + node=node, + get_exit_code_msg=lambda: (exit_code, msg), + gpu_node_id=gpu_node_id, + ) + ) + s.enter_context( + OutputContext( + type, HealthCheckName.RCCL_TESTS, lambda: (exit_code, msg), verbose_out + ) + ) + ff = FeatureValueHealthChecksFeatures() + if ff.get_healthchecksfeatures_disable_rccl_tests(): + exit_code = ExitCode.OK + msg = f"{HealthCheckName.RCCL_TESTS.value} is disabled by killswitch." + logger.info(msg) + sys.exit(exit_code.value) + for host in hosts: + for op in operations: + op_bin = rccl_tdir.rstrip("/") + "/" + op + "_perf" + host_arg = [ + "--host", + ",".join([f"{hostname}:{gpus_per_node}" for hostname in host]), + ] + cmd = mpirun_cmd + host_arg + cmd_args + [op_bin, rccl_topts] + cmd_str = " ".join(cmd) + + logger.info(f"Running command '{cmd_str}'") + try: + output: ShellCommandOut = runner(cmd_str, timeout) + except Exception as e: + output = handle_subprocess_exception(e) + + processed_output: RCCLTestProcessedOutput = process_rccl_test_output( + output, op, critical_threshold, warn_threshold + ) + msg = f"Exit Code {processed_output.exitcode.value}: {processed_output.message}" + logger.info(msg) + logger.info(f"Output:\n{processed_output.stdout}") + print(processed_output.stdout or "") + + outputs += [processed_output] + + if any(output.exitcode == ExitCode.CRITICAL for output in outputs): + exit_code = ExitCode.CRITICAL + elif any(output.exitcode == ExitCode.WARN for output in outputs): + exit_code = ExitCode.WARN + else: + exit_code = ExitCode.OK + + sys.exit(exit_code.value) diff --git a/gcm/health_checks/cli/health_checks.py b/gcm/health_checks/cli/health_checks.py index 44b8660..165afeb 100644 --- a/gcm/health_checks/cli/health_checks.py +++ b/gcm/health_checks/cli/health_checks.py @@ -38,7 +38,9 @@ def health_checks(detach: bool) -> None: checks.check_dcgmi, checks.check_hca, checks.check_nccl, + checks.check_rccl, checks.check_nvidia_smi, + checks.check_amd_smi, checks.check_syslogs, checks.check_process, checks.cuda, diff --git a/gcm/health_checks/config/config.toml b/gcm/health_checks/config/config.toml index d579b15..78c454c 100644 --- a/gcm/health_checks/config/config.toml +++ b/gcm/health_checks/config/config.toml @@ -100,6 +100,16 @@ "log_resource_attributes={'key1': 'val1'}", ] +[health_checks.check-amd-smi] + cluster = "cluster_name" + type = "nagios" + log_folder = "healthchecks" + log_level = "INFO" + sink = "otel" + sink_opts = [ + "log_resource_attributes={'key1': 'val1'}", + ] + [health_checks.check-hca] cluster = "cluster_name" type = "nagios" @@ -141,6 +151,16 @@ "log_resource_attributes={'key1': 'val1'}", ] +[health_checks.check-rccl] + cluster = "cluster_name" + type = "nagios" + log_folder = "healthchecks" + log_level = "INFO" + sink = "otel" + sink_opts = [ + "log_resource_attributes={'key1': 'val1'}", + ] + [health_checks.check-service.service-status] cluster = "cluster_name" type = "nagios" diff --git a/gcm/health_checks/config/feature_example.toml b/gcm/health_checks/config/feature_example.toml index 9de8b63..e991480 100644 --- a/gcm/health_checks/config/feature_example.toml +++ b/gcm/health_checks/config/feature_example.toml @@ -25,6 +25,7 @@ disable_slurm_cluster_avail = true disable_service_status = true disable_package_version = true disable_nccl_tests = true +disable_rccl_tests = true disable_nvidia_smi = false disable_nvidia_smi_gpu_num = true disable_nvidia_smi_clock_freq = true @@ -39,6 +40,13 @@ disable_dcgmi_diag = true disable_dcgmi_nvlink = true disable_dcgmi_nvlink_error = true disable_dcgmi_nvlink_status = true +disable_amd_smi = false +disable_amd_smi_gpu_num = true +disable_amd_smi_clock_freq = true +disable_amd_smi_running_procs = true +disable_amd_smi_running_procs_and_kill = true +disable_amd_smi_gpu_temp = true +disable_amd_smi_mem_usage = true disable_check_ibstat = true disable_check_ib_interfaces = true disable_pass_status = true diff --git a/gcm/health_checks/device_telemetry_utils.py b/gcm/health_checks/device_telemetry_utils.py index b55c7c0..44d41f7 100644 --- a/gcm/health_checks/device_telemetry_utils.py +++ b/gcm/health_checks/device_telemetry_utils.py @@ -12,10 +12,16 @@ def get_gpu_devices( device_telemetry: DeviceTelemetryClient, type: CHECK_TYPE ) -> List[int]: """Get the list of GPU devices. If the type is prolog/epilog only get - the GPU devices that the user has been allocated.""" + the GPU devices that the user has been allocated. + Uses SLURM_JOB_GPUS first, then CUDA_VISIBLE_DEVICES (NVIDIA), then + ROCR_VISIBLE_DEVICES (AMD/ROCm).""" if type == "prolog" or type == "epilog": - devices_env = os.getenv("SLURM_JOB_GPUS") or os.getenv("CUDA_VISIBLE_DEVICES") + devices_env = ( + os.getenv("SLURM_JOB_GPUS") + or os.getenv("CUDA_VISIBLE_DEVICES") + or os.getenv("ROCR_VISIBLE_DEVICES") + ) if not devices_env: return [] else: diff --git a/gcm/monitoring/cli/gcm.py b/gcm/monitoring/cli/gcm.py index f466b89..aa02cec 100644 --- a/gcm/monitoring/cli/gcm.py +++ b/gcm/monitoring/cli/gcm.py @@ -10,6 +10,7 @@ from gcm._version import __version__ from gcm.monitoring.cli import ( nvml_monitor, + rocm_monitor, sacct_backfill, sacct_publish, sacct_running, @@ -36,6 +37,7 @@ def main(detach: bool) -> None: main.add_command(nvml_monitor.main, name="nvml_monitor") +main.add_command(rocm_monitor.main, name="rocm_monitor") main.add_command(sacct_running.main, name="sacct_running") main.add_command(sacct_publish.main, name="sacct_publish") main.add_command(sacct_wrapper.main, name="fsacct") diff --git a/gcm/monitoring/cli/rocm_monitor.py b/gcm/monitoring/cli/rocm_monitor.py new file mode 100644 index 0000000..0daa599 --- /dev/null +++ b/gcm/monitoring/cli/rocm_monitor.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Collects AMD GPU metrics via ROCm (amd-smi/rocm-smi) and publishes to sinks.""" + +import itertools +import json +import logging +import os +import socket +from copy import copy +from dataclasses import asdict, dataclass, field +from typing import ( + Callable, + Collection, + Dict, + Iterable, + List, + Literal, + Mapping, + Optional, + Protocol, + runtime_checkable, + Tuple, +) + +import click +from gcm.exporters import registry +from gcm.monitoring.accumulate import Accumulator +from gcm.monitoring.click import ( + click_default_cmd, + cluster_option, + DynamicEpilogCommand, + EpilogFormatter, + get_docs_for_references, + get_docs_for_registry, + heterogeneous_cluster_v1_option, + interval_option, + log_folder_option, + log_level_option, + once_option, + sink_option, + sink_opts_option, + stdout_option, +) +from gcm.monitoring.clock import Clock, ClockImpl +from gcm.monitoring.dataclass_utils import max_fields +from gcm.monitoring.device_telemetry_client import ( + DeviceTelemetryClient, + DeviceTelemetryException, + GPUDevice, +) +from gcm.monitoring.device_telemetry_rocm import ROCmDeviceTelemetryClient +from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams, SinkImpl +from gcm.monitoring.sink.utils import Factory, HasRegistry +from gcm.monitoring.utils import error + +from gcm.monitoring.utils.monitor import init_logger +from gcm.monitoring.utils.shell import get_command_output +from gcm.schemas.device_metrics import DeviceMetrics, DevicePlusJobMetrics +from gcm.schemas.host_metrics import HostMetrics +from gcm.schemas.indexed_device_metrics import IndexedDeviceMetrics +from gcm.schemas.job_info import JobInfo +from gcm.schemas.log import Log +from omegaconf import OmegaConf as oc +from typeguard import typechecked + +LOGGER_NAME = "rocm_monitor" + +log_error = error.log_error(logger_name=LOGGER_NAME) +logger: logging.Logger # initialization in main() + + +def get_device_metrics_basic(handle: GPUDevice) -> DeviceMetrics: + """Retrieve the device metrics.""" + metrics = DeviceMetrics(mem_used_percent=-1) + + utilization = log_error(handle.get_utilization_rates)() + if utilization is not None: + metrics.mem_util = utilization.memory + metrics.gpu_util = utilization.gpu + + memory_info = log_error(handle.get_memory_info)() + if memory_info is not None: + memory_total = memory_info.total / (1024 * 1024) + memory_used = memory_info.used / (1024 * 1024) + if memory_total > 0: + metrics.mem_used_percent = int(memory_used / memory_total * 100) + else: + metrics.mem_used_percent = 0 + + metrics.temperature = log_error(handle.get_temperature)() + + power_draw = log_error(handle.get_power_usage)() + power_limit = log_error(handle.get_enforced_power_limit)() + if power_draw is not None: + metrics.power_draw = power_draw + if power_limit is not None and power_limit > 0: + metrics.power_used_percent = int(power_draw / power_limit * 100) + + @log_error + def get_retired_pages_count(source: Callable[[], Iterable[int]]) -> int: + try: + return len(list(source())) + except DeviceTelemetryException: + return 0 + + metrics.retired_pages_count_single_bit = get_retired_pages_count( + handle.get_retired_pages_multiple_single_bit_ecc_errors + ) + metrics.retired_pages_count_double_bit = get_retired_pages_count( + handle.get_retired_pages_double_bit_ecc_error + ) + + return metrics + + +def read_environ_from_proc( + process_id: int, + *, + run_cmd: Callable[[List[str]], str] = get_command_output, +) -> Dict[str, str]: + cmd = ["sudo", "cat", f"/proc/{process_id}/environ"] + env_vars_raw = run_cmd(cmd).split("\x00") + return dict(v.split("=", maxsplit=1) for v in env_vars_raw if v != "") + + +ProcessId = int +Env = Mapping[str, str] +EnvReader = Callable[[ProcessId], Env] + + +@log_error +def retrieve_job_on_gpu( + handle: GPUDevice, + *, + env_reader: EnvReader = read_environ_from_proc, +) -> Optional[JobInfo]: + """Retrieve the SLURM Job info for the job running on the GPU.""" + processes = log_error(handle.get_compute_processes)() + if processes is None or len(processes) == 0: + return None + + env = env_reader(processes[0].pid) + return JobInfo.from_env(env) + + +def get_ram_utilization( + *, get_command_output: Callable[[List[str]], str] = get_command_output +) -> float: + """Show the RAM utilization for the host.""" + text = get_command_output(["free", "-m"]) + lines = text.split("\n") + + header = " ".join(lines[0].split()) + ram_info = " ".join(lines[1].split()) + ram_info = ram_info[ram_info.find(":") + 2 :] # noqa: E203 + + header_parts = header.split(" ") + ram_info_parts = ram_info.split(" ") + + ram_stats = {} + for i, key in enumerate(header_parts): + ram_stats[key] = int(ram_info_parts[i]) + + ram_utilization = ram_stats["used"] / ram_stats["total"] + + return ram_utilization + + +def compute_host_level_metrics( + metrics: Iterable[DeviceMetrics], + get_ram_utilization: Callable[[], float], +) -> HostMetrics: + gpu_utils = list(filter(None, (metric.gpu_util for metric in metrics))) + max_gpu_util = max(gpu_utils, default=0) + min_gpu_util = min(gpu_utils, default=0) + avg_gpu_util = sum(gpu_utils) / len(gpu_utils) if len(gpu_utils) > 0 else 0.0 + + ram_util = get_ram_utilization() + + host_level_metrics = HostMetrics( + max_gpu_util=max_gpu_util, + min_gpu_util=min_gpu_util, + avg_gpu_util=avg_gpu_util, + ram_util=ram_util, + ) + + return host_level_metrics + + +def log_setup( + log_folder: str, + hostname: str, + device_count: int, + log_stdout: bool, + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], +) -> Tuple[logging.Logger, logging.Handler, List[logging.Formatter]]: + logger_out, handler = init_logger( + logger_name=LOGGER_NAME, + log_dir=os.path.join(log_folder, LOGGER_NAME + "_logs"), + log_name=hostname + ".log", + log_formatter=logging.Formatter("[%(asctime)s] - [General] - %(message)s"), + log_stdout=log_stdout, + log_level=getattr(logging, log_level), + ) + + gpu_specific_formatter = [ + logging.Formatter("[%(asctime)s] - [GPU# {}] - %(message)s".format(idx)) + for idx in range(device_count) + ] + + return logger_out, handler, gpu_specific_formatter + + +@runtime_checkable +class CliObject(EpilogFormatter, HasRegistry[SinkImpl], Protocol): + def get_device_telemetry(self) -> DeviceTelemetryClient: ... + + @property + def clock(self) -> Clock: ... + + def read_env(self, process_id: int) -> Env: ... + + def get_ram_utilization(self) -> float: ... + + def get_hostname(self) -> str: ... + + def looptimes(self, once: bool) -> Iterable[int]: ... + + +@dataclass +class CliObjectImpl: + registry: Mapping[str, Factory[SinkImpl]] = field(default_factory=lambda: registry) + clock: Clock = field(default_factory=ClockImpl) + + def get_device_telemetry(self) -> DeviceTelemetryClient: + return ROCmDeviceTelemetryClient() + + def read_env(self, process_id: int) -> Env: + return read_environ_from_proc(process_id) + + def get_ram_utilization(self) -> float: + return get_ram_utilization() + + def get_hostname(self) -> str: + return socket.gethostname().replace(".maas", "") + + def format_epilog(self) -> str: + return get_docs_for_registry(self.registry) + get_docs_for_references( + [ + "https://omegaconf.readthedocs.io/en/2.2_branch/usage.html#from-a-dot-list", + ] + ) + + def looptimes(self, once: bool) -> Iterable[int]: + if once: + return range(1) + return itertools.count(0) + + +class CustomCommand( + DynamicEpilogCommand[CliObject], + obj_cls=CliObject, # type: ignore[type-abstract] +): + pass + + +_default_obj: CliObject = CliObjectImpl() + + +@click_default_cmd(cls=CustomCommand, context_settings={"obj": _default_obj}) +@cluster_option +@sink_option +@sink_opts_option +@log_level_option +@log_folder_option +@stdout_option +@heterogeneous_cluster_v1_option +@click.option( + "--push-interval", + type=click.IntRange(min=1), + default=60, + help="The interval in seconds to push metrics.", +) +@click.option( + "--collect-interval", + type=click.IntRange(min=1), + default=10, + help="The interval in seconds to collect telemetry data.", +) +@interval_option(default=90) +@click.option( + "--stdout", + is_flag=True, + help="Whether to display metric information to stdout.", +) +@once_option +@click.pass_obj +@typechecked +def main( + obj: CliObject, + cluster: Optional[str], + sink: str, + sink_opts: Collection[str], + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + log_folder: str, + heterogeneous_cluster_v1: bool, + push_interval: int, + collect_interval: int, + stdout: bool, + interval: int, + once: bool, +) -> None: + """Script for reading AMD GPU metrics on the node via ROCm (amd-smi/rocm-smi).""" + global logger + + device_telemetry = obj.get_device_telemetry() + + device_count = device_telemetry.get_device_count() + + hostname = obj.get_hostname() + + logger, log_handler, gpu_specific_formatter = log_setup( + log_folder, hostname, device_count, stdout, log_level + ) + + sink_impl = obj.registry[sink](**oc.from_dotlist(list(sink_opts))) + + for _ in obj.looptimes(once): + run_st_time = obj.clock.monotonic() + + job_per_device_collection: Dict[int, JobInfo] = { + gpu: JobInfo() for gpu in range(device_count) + } + accumulators = [ + Accumulator[DeviceMetrics](max_fields(DeviceMetrics)) + for _ in range(device_count) + ] + + while obj.clock.monotonic() - run_st_time < push_interval: + for device_index, accumulator in enumerate(accumulators): + log_handler.setFormatter(gpu_specific_formatter[device_index]) + + handle = log_error(device_telemetry.get_device_by_index)(device_index) + if handle is None: + continue + + accumulator.tell(get_device_metrics_basic(handle)) + + maybe_job_info = retrieve_job_on_gpu( + handle, + env_reader=obj.read_env, + ) + if maybe_job_info is not None: + job_per_device_collection[device_index] = copy(maybe_job_info) + + obj.clock.sleep(collect_interval) + + log_time = obj.clock.unixtime() + indexed_device_metrics: List[IndexedDeviceMetrics] = [] + + for index, metrics in enumerate(a.ask() for a in accumulators): + indexed_device_metrics.append( + IndexedDeviceMetrics(gpu_index=index, **asdict(metrics)) + ) + + device_plus_job_metrics = DevicePlusJobMetrics( + gpu_id=index, + hostname=hostname, + **asdict(metrics), + **asdict(job_per_device_collection[index]), + ) + + sink_impl.write( + Log(ts=log_time, message=[device_plus_job_metrics]), + additional_params=SinkAdditionalParams(data_type=DataType.LOG), + ) + logger.debug(json.dumps(asdict(device_plus_job_metrics), sort_keys=True)) + + host_level_metrics = compute_host_level_metrics( + (a.ask() for a in accumulators), obj.get_ram_utilization + ) + logger.debug(json.dumps(asdict(host_level_metrics), sort_keys=True)) + + sink_impl.write( + Log( + ts=log_time, + message=indexed_device_metrics + [host_level_metrics], + ), + additional_params=SinkAdditionalParams(data_type=DataType.METRIC), + ) + + obj.clock.sleep(max(0, interval - (obj.clock.monotonic() - run_st_time))) diff --git a/gcm/monitoring/device_telemetry_rocm.py b/gcm/monitoring/device_telemetry_rocm.py new file mode 100644 index 0000000..d563e23 --- /dev/null +++ b/gcm/monitoring/device_telemetry_rocm.py @@ -0,0 +1,418 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""ROCm/AMD GPU device telemetry via amd-smi or rocm-smi (subprocess + JSON).""" + +from __future__ import annotations + +import json +import logging +import shutil +import subprocess +from typing import Any, Dict, Iterable, List, Optional + +from gcm.monitoring.device_telemetry_client import ( + ApplicationClockInfo, + DeviceTelemetryException, + GPUMemory, + GPUUtilization, + ProcessInfo, + RemappedRowInfo, +) + +logger = logging.getLogger(__name__) + +# Default vbios string when not available from ROCm (satisfies protocol). +_AMD_VBIOS_PLACEHOLDER = "AMD-ROCm" + + +def _run_cmd(args: List[str], timeout_secs: int = 30) -> str: + try: + out = subprocess.run( + args, + capture_output=True, + text=True, + timeout=timeout_secs, + ) + except FileNotFoundError as e: + raise DeviceTelemetryException( + f"ROCm tool not found: {args[0]}. Is amd-smi or rocm-smi installed?" + ) from e + except subprocess.TimeoutExpired as e: + raise DeviceTelemetryException(f"Command timed out: {args}") from e + if out.returncode != 0 and out.stderr: + raise DeviceTelemetryException( + f"Command failed (exit {out.returncode}): {out.stderr.strip() or out.stdout.strip()}" + ) + return out.stdout or "" + + +def _extract_json_from_stdout(raw: str) -> str: + """Extract JSON from amd-smi stdout; it may print warnings or info before the JSON.""" + raw = (raw or "").strip() + if not raw: + return raw + # Find first { or [ (amd-smi typically returns an object). + start_obj = raw.find("{") + start_arr = raw.find("[") + if start_obj == -1 and start_arr == -1: + return raw + if start_obj == -1: + start, open_ch, close_ch = start_arr, "[", "]" + elif start_arr == -1: + start, open_ch, close_ch = start_obj, "{", "}" + else: + start = min(start_obj, start_arr) + open_ch = raw[start] + close_ch = "]" if open_ch == "[" else "}" + depth = 0 + for i in range(start, len(raw)): + if raw[i] == open_ch: + depth += 1 + elif raw[i] == close_ch: + depth -= 1 + if depth == 0: + return raw[start : i + 1] + return raw[start:] + + +def _find_rocm_tool() -> Optional[str]: + """Return 'amd-smi' or 'rocm-smi' if available, else None.""" + for name in ("amd-smi", "rocm-smi"): + if shutil.which(name): + return name + return None + + +class ROCmGPUDevice: + """Per-device telemetry for one AMD GPU; implements GPUDevice protocol.""" + + def __init__( + self, + index: int, + metrics: Dict[str, Any], + memory: Dict[str, Any], + processes: List[Dict[str, Any]], + ) -> None: + self._index = index + self._metrics = metrics or {} + self._memory = memory or {} + self._processes = processes or [] + + def get_compute_processes(self) -> List[ProcessInfo]: + out: List[ProcessInfo] = [] + for p in self._processes: + pid = p.get("pid") or p.get("process_id") + mem = p.get("used_gpu_memory") or p.get("memory") or 0 + if pid is not None: + out.append(ProcessInfo(pid=int(pid), usedGpuMemory=int(mem))) + return out + + def get_retired_pages_double_bit_ecc_error(self) -> Iterable[int]: + return [] + + def get_retired_pages_multiple_single_bit_ecc_errors(self) -> Iterable[int]: + return [] + + def get_retired_pages_pending_status(self) -> int: + return 0 + + def get_remapped_rows(self) -> RemappedRowInfo: + return RemappedRowInfo( + correctable=0, uncorrectable=0, pending=0, failure=0 + ) + + def get_ecc_uncorrected_volatile_total(self) -> int: + return 0 + + def get_ecc_corrected_volatile_total(self) -> int: + return 0 + + def get_enforced_power_limit(self) -> Optional[int]: + val = self._metrics.get("average_socket_power_cap") or self._metrics.get( + "power_cap" + ) + if val is not None: + try: + return int(val) + except (TypeError, ValueError): + pass + return None + + def get_power_usage(self) -> Optional[int]: + val = self._metrics.get("average_socket_power") or self._metrics.get( + "current_socket_power" + ) + if val is not None: + try: + return int(val) + except (TypeError, ValueError): + pass + return None + + def get_temperature(self) -> int: + # Prefer hotspot, then edge (Celsius). + val = ( + self._metrics.get("temperature_hotspot") + or self._metrics.get("temperature_edge") + or self._metrics.get("temperature") + ) + if val is not None: + try: + return int(val) + except (TypeError, ValueError): + pass + return 0 + + def get_memory_info(self) -> GPUMemory: + total = self._memory.get("total") or self._memory.get("total_memory") or 0 + free = self._memory.get("free") or self._memory.get("free_memory") or 0 + used = self._memory.get("used") or self._memory.get("used_memory") + if used is None and total is not None and free is not None: + used = total - free + elif used is None: + used = 0 + # amd-smi/rocm-smi often report in MB; convert to bytes for schema. + if isinstance(total, (int, float)) and total < 1e7: + total = int(total) * 1024 * 1024 + if isinstance(free, (int, float)) and free < 1e7: + free = int(free) * 1024 * 1024 + if isinstance(used, (int, float)) and used < 1e7: + used = int(used) * 1024 * 1024 + return GPUMemory(total=int(total), free=int(free), used=int(used)) + + def get_utilization_rates(self) -> GPUUtilization: + gpu = self._metrics.get("average_gfx_activity") or self._metrics.get( + "gpu_activity" + ) or 0 + mem = self._metrics.get("average_umc_activity") or self._metrics.get( + "memory_activity" + ) or 0 + try: + gpu_pct = int(float(gpu) * 100) if float(gpu) <= 1.0 else int(gpu) + except (TypeError, ValueError): + gpu_pct = 0 + try: + mem_pct = int(float(mem) * 100) if float(mem) <= 1.0 else int(mem) + except (TypeError, ValueError): + mem_pct = 0 + return GPUUtilization(gpu=gpu_pct, memory=mem_pct) + + def get_clock_freq(self) -> ApplicationClockInfo: + gfx = self._metrics.get("current_gfxclk") or self._metrics.get( + "average_gfxclk" + ) or 0 + mem = self._metrics.get("current_uclk") or self._metrics.get( + "average_uclk" + ) or self._metrics.get("current_memclk") or 0 + try: + gfx_int = int(gfx) + except (TypeError, ValueError): + gfx_int = 0 + try: + mem_int = int(mem) + except (TypeError, ValueError): + mem_int = 0 + return ApplicationClockInfo( + graphics_freq=gfx_int, memory_freq=mem_int + ) + + def get_vbios_version(self) -> str: + return self._memory.get("vbios_version") or self._metrics.get( + "vbios_version" + ) or _AMD_VBIOS_PLACEHOLDER + + +class ROCmDeviceTelemetryClient: + """DeviceTelemetryClient implementation using amd-smi or rocm-smi.""" + + def __init__(self, tool_path: Optional[str] = None, timeout_secs: int = 30) -> None: + self._tool: Optional[str] = tool_path or _find_rocm_tool() + self._timeout = timeout_secs + self._device_count: Optional[int] = None + self._cache: Dict[int, ROCmGPUDevice] = {} + + def _ensure_init(self) -> None: + if self._tool is None: + raise DeviceTelemetryException( + "No ROCm tool found. Install amd-smi or rocm-smi and ensure it is on PATH." + ) + if self._device_count is not None: + return + try: + if self._tool == "amd-smi": + self._device_count = self._amd_smi_get_count() + else: + self._device_count = self._rocm_smi_get_count() + except Exception as e: + if isinstance(e, DeviceTelemetryException): + raise + raise DeviceTelemetryException(str(e)) from e + + def _amd_smi_get_count(self) -> int: + out = _run_cmd([self._tool, "list", "--json"], self._timeout) + json_str = _extract_json_from_stdout(out) + if not json_str: + raise DeviceTelemetryException( + "amd-smi list --json produced no JSON output. Check amd-smi and GPU visibility." + ) + data = json.loads(json_str) + # Format: {"system": {"host_driver_version": "...", "gpus": [...]}} or {"gpus": [...]} + # depending on version; some amd-smi versions return a top-level array of GPU objects. + if isinstance(data, list): + gpus = data + else: + gpus = data.get("gpus") or data.get("system", {}).get("gpus") or [] + if isinstance(gpus, dict): + gpus = list(gpus.values()) if gpus else [] + return len(gpus) + + def _rocm_smi_get_count(self) -> int: + # rocm-smi -i (show id) or -a; count GPUs from output or --json if supported. + out = _run_cmd([self._tool, "-i"], self._timeout) + count = 0 + for line in out.splitlines(): + if "GPU[" in line or "card" in line.lower() or "device" in line.lower(): + count += 1 + if count == 0: + # Try rocm-smi --showproductname (one line per GPU). + out2 = _run_cmd([self._tool, "--showproductname"], self._timeout) + count = max(1, out2.strip().count("\n") + 1) if out2.strip() else 1 + return max(1, count) + + def get_device_count(self) -> int: + self._ensure_init() + assert self._device_count is not None + return self._device_count + + def _get_device_data(self, index: int) -> ROCmGPUDevice: + if index in self._cache: + return self._cache[index] + self._ensure_init() + if index < 0 or index >= (self._device_count or 0): + raise DeviceTelemetryException(f"Invalid GPU index: {index}") + try: + if self._tool == "amd-smi": + dev = self._amd_smi_get_device(index) + else: + dev = self._rocm_smi_get_device(index) + except Exception as e: + if isinstance(e, DeviceTelemetryException): + raise + raise DeviceTelemetryException(str(e)) from e + self._cache[index] = dev + return dev + + def _amd_smi_get_device(self, index: int) -> ROCmGPUDevice: + metrics: Dict[str, Any] = {} + memory: Dict[str, Any] = {} + processes: List[Dict[str, Any]] = [] + try: + out = _run_cmd( + [self._tool, "metric", "--gpu", str(index), "--json"], + self._timeout, + ) + data = json.loads(_extract_json_from_stdout(out)) + # Nested by gpu_id or in a list. + if isinstance(data, dict): + metrics = data.get(str(index)) or data.get("gpu_metrics") or data + elif isinstance(data, list) and len(data) > index: + metrics = data[index] if isinstance(data[index], dict) else {} + except (json.JSONDecodeError, DeviceTelemetryException): + pass + try: + out = _run_cmd( + [self._tool, "static", "-v", "--gpu", str(index), "--json"], + self._timeout, + ) + data = json.loads(_extract_json_from_stdout(out)) + vram = data.get("vram") or data.get("VRAM") or {} + if isinstance(vram, dict): + memory = vram + elif isinstance(data, dict): + memory = data + except (json.JSONDecodeError, DeviceTelemetryException): + # Derive from metric if available (e.g. mem_used / total). + pass + try: + out = _run_cmd( + [self._tool, "process", "--gpu", str(index), "--json"], + self._timeout, + ) + data = json.loads(_extract_json_from_stdout(out)) + if isinstance(data, list): + processes = data + elif isinstance(data, dict): + processes = data.get("processes") or data.get("process_list") or [] + except (json.JSONDecodeError, DeviceTelemetryException): + pass + return ROCmGPUDevice(index, metrics, memory, processes) + + def _rocm_smi_get_device(self, index: int) -> ROCmGPUDevice: + metrics: Dict[str, Any] = {} + memory: Dict[str, Any] = {} + processes: List[Dict[str, Any]] = [] + # rocm-smi --showmeminfo vram --showtemp --showuse -d N (or all). + try: + out = _run_cmd( + [self._tool, "--showmeminfo", "vram", "-d", str(index)], + self._timeout, + ) + for line in out.splitlines(): + if "VRAM Total Memory (B)" in line or "Total" in line: + parts = line.split(":") + if len(parts) >= 2: + try: + memory["total"] = int(parts[-1].strip().split()[0]) + except (ValueError, IndexError): + pass + if "VRAM Total Used Memory (B)" in line or "Used" in line: + parts = line.split(":") + if len(parts) >= 2: + try: + memory["used"] = int(parts[-1].strip().split()[0]) + except (ValueError, IndexError): + pass + if "total" in memory and "used" in memory: + memory["free"] = memory["total"] - memory["used"] + except DeviceTelemetryException: + pass + try: + out = _run_cmd( + [self._tool, "--showtemp", "-d", str(index)], + self._timeout, + ) + for line in out.splitlines(): + if "GPU temperature" in line or "Temperature" in line: + parts = line.split(":") + if len(parts) >= 2: + try: + metrics["temperature"] = int( + parts[-1].strip().replace("C", "").split()[0] + ) + break + except (ValueError, IndexError): + pass + except DeviceTelemetryException: + pass + try: + out = _run_cmd( + [self._tool, "--showuse", "-d", str(index)], + self._timeout, + ) + for line in out.splitlines(): + if "GPU use" in line or "Gpu use" in line: + parts = line.split(":") + if len(parts) >= 2: + try: + metrics["average_gfx_activity"] = ( + int(parts[-1].strip().replace("%", "").split()[0]) / 100.0 + ) + break + except (ValueError, IndexError): + pass + except DeviceTelemetryException: + pass + return ROCmGPUDevice(index, metrics, memory, processes) + + def get_device_by_index(self, index: int) -> ROCmGPUDevice: + return self._get_device_data(index) diff --git a/gcm/monitoring/features/feature_definitions/health_checks_features.py b/gcm/monitoring/features/feature_definitions/health_checks_features.py index e472da9..eada689 100644 --- a/gcm/monitoring/features/feature_definitions/health_checks_features.py +++ b/gcm/monitoring/features/feature_definitions/health_checks_features.py @@ -29,6 +29,7 @@ class HealthChecksFeatures: disable_service_status: bool disable_package_version: bool disable_nccl_tests: bool + disable_rccl_tests: bool disable_nvidia_smi: bool disable_nvidia_smi_gpu_num: bool disable_nvidia_smi_clock_freq: bool @@ -47,6 +48,13 @@ class HealthChecksFeatures: disable_dcgmi_nvlink: bool disable_dcgmi_nvlink_error: bool disable_dcgmi_nvlink_status: bool + disable_amd_smi: bool + disable_amd_smi_gpu_num: bool + disable_amd_smi_clock_freq: bool + disable_amd_smi_running_procs: bool + disable_amd_smi_running_procs_and_kill: bool + disable_amd_smi_gpu_temp: bool + disable_amd_smi_mem_usage: bool disable_check_ibstat: bool disable_check_ib_interfaces: bool disable_check_iblink: bool diff --git a/gcm/monitoring/features/gen/generated_features_healthchecksfeatures.py b/gcm/monitoring/features/gen/generated_features_healthchecksfeatures.py index ec186ee..198a283 100644 --- a/gcm/monitoring/features/gen/generated_features_healthchecksfeatures.py +++ b/gcm/monitoring/features/gen/generated_features_healthchecksfeatures.py @@ -36,7 +36,9 @@ def load_config(self) -> Dict[str, Any]: f"Error reading toml file. {FeatureValueHealthChecksFeatures.config_path} does not contain valid TOML. Error: {e}" ) raise tomli.TOMLDecodeError( - f"{FeatureValueHealthChecksFeatures.config_path} does not contain valid TOML.", + msg=f"{FeatureValueHealthChecksFeatures.config_path} does not contain valid TOML. {e.msg}", + doc=e.doc, + pos=e.pos, ) from e else: raise ValueError( @@ -423,6 +425,20 @@ def get_healthchecksfeatures_disable_nccl_tests(self) -> bool: ) return value + def get_healthchecksfeatures_disable_rccl_tests(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_rccl_tests", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_rccl_tests, got {type(value).__name__} instead." + ) + return value + def get_healthchecksfeatures_disable_nvidia_smi(self) -> bool: try: features = self.load_config() @@ -677,6 +693,106 @@ def get_healthchecksfeatures_disable_dcgmi_nvlink_status(self) -> bool: ) return value + def get_healthchecksfeatures_disable_amd_smi(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_gpu_num(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_gpu_num", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_gpu_num, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_clock_freq(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_clock_freq", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_clock_freq, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_running_procs(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_running_procs", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_running_procs, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_running_procs_and_kill( + self, + ) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_running_procs_and_kill", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_running_procs_and_kill, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_gpu_temp(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_gpu_temp", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_gpu_temp, got {type(value).__name__} instead." + ) + return value + + def get_healthchecksfeatures_disable_amd_smi_mem_usage(self) -> bool: + try: + features = self.load_config() + except Exception: + return False + value = features.get("HealthChecksFeatures", {}).get( + "disable_amd_smi_mem_usage", False + ) + if not isinstance(value, bool): + raise TypeError( + f"Expected bool value for HealthChecksFeatures.disable_amd_smi_mem_usage, got {type(value).__name__} instead." + ) + return value + def get_healthchecksfeatures_disable_check_ibstat(self) -> bool: try: features = self.load_config() diff --git a/gcm/monitoring/features/gen/generated_features_testingfeatures.py b/gcm/monitoring/features/gen/generated_features_testingfeatures.py index dac1719..8f71bde 100644 --- a/gcm/monitoring/features/gen/generated_features_testingfeatures.py +++ b/gcm/monitoring/features/gen/generated_features_testingfeatures.py @@ -36,7 +36,9 @@ def load_config(self) -> Dict[str, Any]: f"Error reading toml file. {FeatureValueTestingFeatures.config_path} does not contain valid TOML. Error: {e}" ) raise tomli.TOMLDecodeError( - f"{FeatureValueTestingFeatures.config_path} does not contain valid TOML.", + msg=f"{FeatureValueTestingFeatures.config_path} does not contain valid TOML. {e.msg}", + doc=e.doc, + pos=e.pos, ) from e else: raise ValueError( diff --git a/gcm/schemas/health_check/health_check_name.py b/gcm/schemas/health_check/health_check_name.py index df58408..fba7de2 100644 --- a/gcm/schemas/health_check/health_check_name.py +++ b/gcm/schemas/health_check/health_check_name.py @@ -33,6 +33,7 @@ class HealthCheckName(Enum): PACKAGE_VERSION = "package version" AIRSTORE_CREDENTIAL_COUNT = "airstore credential count" NCCL_TESTS = "nccl-tests" + RCCL_TESTS = "rccl-tests" NVIDIA_SMI = "nvidia smi" NVIDIA_SMI_GPU_NUM = "nvidia smi gpu_num" NVIDIA_SMI_CLOCK_FREQ = "nvidia smi clock_freq" @@ -51,6 +52,13 @@ class HealthCheckName(Enum): DCGMI_NVLINK = "check dcgmi nvlink" DCGMI_NVMLINK_ERROR = "check dcgmi nvlink error" DCGMI_NVMLINK_STATUS = "check dcgmi nvlink status" + AMD_SMI = "amd smi" + AMD_SMI_GPU_NUM = "amd smi gpu_num" + AMD_SMI_CLOCK_FREQ = "amd smi clock_freq" + AMD_SMI_RUNNING_PROCS = "amd smi running_procs" + AMD_SMI_RUNNING_PROCS_AND_KILL = "amd smi running_procs_and_kill" + AMD_SMI_GPU_TEMP = "amd smi gpu_temp" + AMD_SMI_MEM_USAGE = "amd smi mem_usage" CHECK_IBSTAT = "check ibstat" CHECK_IB_INTERFACES = "check ib interfaces" CHECK_IBLINK = "check iblink" diff --git a/gcm/tests/health_checks_tests/test_amd_smi.py b/gcm/tests/health_checks_tests/test_amd_smi.py new file mode 100644 index 0000000..100e742 --- /dev/null +++ b/gcm/tests/health_checks_tests/test_amd_smi.py @@ -0,0 +1,78 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Tests for AMD SMI health check (check_amd_smi).""" + +from dataclasses import dataclass, field +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from gcm.health_checks.checks.check_amd_smi import AmdSmiCli, check_amd_smi +from gcm.health_checks.types import ExitCode +from gcm.monitoring.device_telemetry_client import DeviceTelemetryClient, GPUDevice +from gcm.tests.fakes import FakeGPUDevice + + +@dataclass +class FakeAmdSmiCliObject: + cluster: str + type: str + log_level: str + log_folder: str + device_telemetry_client: DeviceTelemetryClient + + def get_device_telemetry(self) -> DeviceTelemetryClient: + return self.device_telemetry_client + + +def test_check_amd_smi_gpu_num_ok( + caplog: pytest.LogCaptureFixture, + tmp_path: Path, +) -> None: + """check_amd_smi gpu_num passes when device count matches expected.""" + class FakeDeviceTelemetryClient: + devices: list = field(default_factory=lambda: [FakeGPUDevice()] * 8) + + def get_device_count(self) -> int: + return 8 + + def get_device_by_index(self, index: int) -> GPUDevice: + return self.devices[index] + + fake_obj: AmdSmiCli = FakeAmdSmiCliObject( + "cluster", "type", "log_level", str(tmp_path), FakeDeviceTelemetryClient() + ) + runner = CliRunner(mix_stderr=False) + result = runner.invoke( + check_amd_smi, + f"fair_cluster nagios --log-folder={tmp_path} --sink=do_nothing -c gpu_num --gpu_num=8", + obj=fake_obj, + ) + assert result.exit_code == ExitCode.OK.value + assert "Number of GPUs present is the same as expected, 8" in caplog.text + + +def test_check_amd_smi_gpu_num_critical( + caplog: pytest.LogCaptureFixture, + tmp_path: Path, +) -> None: + """check_amd_smi gpu_num fails when device count does not match.""" + class FakeDeviceTelemetryClient: + def get_device_count(self) -> int: + return 4 + + def get_device_by_index(self, index: int) -> GPUDevice: + return FakeGPUDevice() + + fake_obj: AmdSmiCli = FakeAmdSmiCliObject( + "cluster", "type", "log_level", str(tmp_path), FakeDeviceTelemetryClient() + ) + runner = CliRunner(mix_stderr=False) + result = runner.invoke( + check_amd_smi, + f"fair_cluster nagios --log-folder={tmp_path} --sink=do_nothing -c gpu_num --gpu_num=8", + obj=fake_obj, + ) + assert result.exit_code == ExitCode.CRITICAL.value + assert "Number of GPUs present, 4, is different than expected, 8" in caplog.text diff --git a/gcm/tests/health_checks_tests/test_check_rccl.py b/gcm/tests/health_checks_tests/test_check_rccl.py new file mode 100644 index 0000000..167999a --- /dev/null +++ b/gcm/tests/health_checks_tests/test_check_rccl.py @@ -0,0 +1,190 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import logging +import socket +import subprocess +from pathlib import Path +from typing import Any, Optional + +import pytest +from click import BadParameter +from click.testing import CliRunner + +from gcm.health_checks.checks.check_rccl import ( + check_rccl, + Flavor, + get_avg_bus_bw, + get_hosts, + process_rccl_test_output, +) +from gcm.health_checks.subprocess import ShellCommandOut +from gcm.health_checks.types import ExitCode +from gcm.tests.fakes import FakeShellCommandOut + +# RCCL/rccl-tests output format matches nccl-tests (Avg bus bandwidth line) +SAMPLE_RCCL_SUCCESS_OUTPUT = """ +# RCCL test output (same format as nccl-tests) +# size count type redop root time algbw busbw + 33554432 8388608 float sum -1 344.8 97.32 170.31 0 +# Avg bus bandwidth : 210.99 +# +""" + +SAMPLE_RCCL_FAILURE_OUTPUT = """ +There are not enough slots available in the system to satisfy the 16 +slots that were requested by the application. +""" + + +def test_get_avg_bus_bw_success() -> None: + out = FakeShellCommandOut([], 0, SAMPLE_RCCL_SUCCESS_OUTPUT) + assert get_avg_bus_bw(out) == 210.99 + + +def test_get_avg_bus_bw_bus_bandwidth_line() -> None: + out = FakeShellCommandOut([], 0, "Some output\nBus bandwidth: 100.5\n") + assert get_avg_bus_bw(out) == 100.5 + + +def test_get_avg_bus_bw_fail_returncode() -> None: + out = FakeShellCommandOut([], 1, SAMPLE_RCCL_SUCCESS_OUTPUT) + assert get_avg_bus_bw(out) is None + + +def test_get_avg_bus_bw_no_bandwidth_line() -> None: + out = FakeShellCommandOut([], 0, "No bandwidth here") + assert get_avg_bus_bw(out) is None + + +@pytest.mark.parametrize( + "critical_threshold, warn_threshold, expected", + [ + (200, None, ExitCode.OK), + (200, 210, ExitCode.OK), + (210, 211, ExitCode.WARN), + (211, 211, ExitCode.CRITICAL), + ], +) +def test_process_rccl_test_output( + critical_threshold: float, + warn_threshold: Optional[float], + expected: ExitCode, +) -> None: + out = FakeShellCommandOut([], 0, SAMPLE_RCCL_SUCCESS_OUTPUT) + result = process_rccl_test_output( + out, "all_reduce", critical_threshold, warn_threshold + ) + assert result.exitcode == expected + + +def test_process_rccl_test_output_failed_run() -> None: + out = FakeShellCommandOut([], 1, "error") + result = process_rccl_test_output(out, "all_reduce", 10.0, None) + assert result.exitcode == ExitCode.WARN + assert "FAILED to run" in result.message + + +@pytest.mark.parametrize( + "critical_threshold, warn_threshold, expected", + [ + (200, None, ExitCode.OK), + (211, None, ExitCode.CRITICAL), + ], +) +def test_check_rccl_successful( + caplog: pytest.LogCaptureFixture, + tmp_path: Path, + critical_threshold: float, + warn_threshold: Optional[float], + expected: ExitCode, +) -> None: + runner = CliRunner(mix_stderr=False) + + def mock_runner(cmd: str, timeout: int) -> ShellCommandOut: + return FakeShellCommandOut( + [], + 0, + SAMPLE_RCCL_SUCCESS_OUTPUT.format(hostname=socket.gethostname()), + ) + + args = ( + f"fair_cluster prolog --log-folder={tmp_path} --sink=do_nothing " + f"-p all_reduce --rccl-tdir /opt/rccl-tests/build/ --critical-threshold {critical_threshold}" + ) + if warn_threshold is not None: + args += f" --warn-threshold {warn_threshold}" + + result = runner.invoke(check_rccl, args, obj=mock_runner) + assert result.exit_code == expected.value + assert "Avg bus bandwidth" in caplog.text or "RCCL Test" in caplog.text + + +def test_check_rccl_failure(tmp_path: Path) -> None: + runner = CliRunner(mix_stderr=False) + + def mock_runner(cmd: str, timeout: int) -> ShellCommandOut: + return FakeShellCommandOut([], 0, SAMPLE_RCCL_FAILURE_OUTPUT) + + result = runner.invoke( + check_rccl, + ( + f"fair_cluster prolog --log-folder={tmp_path} --sink=do_nothing " + "-p all_reduce --rccl-tdir /opt/rccl-tests/build/ --critical-threshold 200" + ), + obj=mock_runner, + ) + assert result.exit_code == ExitCode.WARN.value + + +def test_check_rccl_exception(caplog: pytest.LogCaptureFixture, tmp_path: Path) -> None: + def mock_runner(cmd: str, timeout: int) -> ShellCommandOut: + raise subprocess.CalledProcessError( + 255, + "", + "Command returned non-zero exit status 255.", + ) + + runner = CliRunner(mix_stderr=False) + caplog.at_level(logging.INFO) + + result = runner.invoke( + check_rccl, + ( + f"fair_cluster prolog --log-folder={tmp_path} --sink=do_nothing " + "-p all_reduce --rccl-tdir /opt/rccl-tests/build/ --critical-threshold 200" + ), + obj=mock_runner, + ) + + assert result.exit_code == ExitCode.WARN.value + assert "RCCL Test - all_reduce - FAILED to run." in caplog.text + + +@pytest.mark.parametrize( + "flavor, hostlist, expected_result", + [ + ("single", None, [(socket.gethostname(),)]), + ("single", "node-100", [("node-100",)]), + ( + "pairwise", + "node-[100-101]", + [ + ("node-100", "node-101"), + ], + ), + ], +) +def test_get_hosts_success( + flavor: Flavor, + hostlist: str, + expected_result: Any, +) -> None: + logger = logging.getLogger(__name__) + result = get_hosts(flavor, hostlist, logger) + assert result == expected_result + + +def test_get_hosts_pairwise_requires_hostlist() -> None: + logger = logging.getLogger(__name__) + with pytest.raises(BadParameter): + get_hosts("pairwise", None, logger) diff --git a/gcm/tests/health_checks_tests/test_device_telemetry_utils.py b/gcm/tests/health_checks_tests/test_device_telemetry_utils.py new file mode 100644 index 0000000..7ab22e6 --- /dev/null +++ b/gcm/tests/health_checks_tests/test_device_telemetry_utils.py @@ -0,0 +1,71 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Tests for get_gpu_devices including ROCR_VISIBLE_DEVICES (AMD/ROCm).""" + +from unittest.mock import MagicMock + +import pytest + +from gcm.health_checks.device_telemetry_utils import get_gpu_devices + + +def test_get_gpu_devices_prolog_slurm_job_gpus(monkeypatch: pytest.MonkeyPatch) -> None: + """SLURM_JOB_GPUS takes precedence.""" + mock_telemetry = MagicMock() + mock_telemetry.get_device_count.return_value = 8 + monkeypatch.setenv("SLURM_JOB_GPUS", "0,1,2") + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + monkeypatch.delenv("ROCR_VISIBLE_DEVICES", raising=False) + out = get_gpu_devices(mock_telemetry, "prolog") + assert out == [0, 1, 2] + mock_telemetry.get_device_count.assert_not_called() + + +def test_get_gpu_devices_prolog_rocr_visible_devices( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """ROCR_VISIBLE_DEVICES used when SLURM_JOB_GPUS and CUDA_VISIBLE_DEVICES unset.""" + mock_telemetry = MagicMock() + mock_telemetry.get_device_count.return_value = 4 + monkeypatch.delenv("SLURM_JOB_GPUS", raising=False) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + monkeypatch.setenv("ROCR_VISIBLE_DEVICES", "2,3") + out = get_gpu_devices(mock_telemetry, "epilog") + assert out == [2, 3] + mock_telemetry.get_device_count.assert_not_called() + + +def test_get_gpu_devices_prolog_cuda_visible_devices( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """CUDA_VISIBLE_DEVICES used when SLURM_JOB_GPUS unset.""" + mock_telemetry = MagicMock() + monkeypatch.delenv("SLURM_JOB_GPUS", raising=False) + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", "1,2,3") + monkeypatch.delenv("ROCR_VISIBLE_DEVICES", raising=False) + out = get_gpu_devices(mock_telemetry, "prolog") + assert out == [1, 2, 3] + + +def test_get_gpu_devices_nagios_all_devices(monkeypatch: pytest.MonkeyPatch) -> None: + """Non-prolog/epilog returns all device indices.""" + mock_telemetry = MagicMock() + mock_telemetry.get_device_count.return_value = 4 + monkeypatch.delenv("SLURM_JOB_GPUS", raising=False) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + monkeypatch.delenv("ROCR_VISIBLE_DEVICES", raising=False) + out = get_gpu_devices(mock_telemetry, "nagios") + assert out == [0, 1, 2, 3] + mock_telemetry.get_device_count.assert_called_once() + + +def test_get_gpu_devices_prolog_empty_when_no_env( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Prolog/epilog with no GPU env vars returns empty list.""" + mock_telemetry = MagicMock() + monkeypatch.delenv("SLURM_JOB_GPUS", raising=False) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + monkeypatch.delenv("ROCR_VISIBLE_DEVICES", raising=False) + out = get_gpu_devices(mock_telemetry, "epilog") + assert out == [] diff --git a/gcm/tests/health_checks_tests/test_killswitches.py b/gcm/tests/health_checks_tests/test_killswitches.py index beb4b4a..a295ff8 100644 --- a/gcm/tests/health_checks_tests/test_killswitches.py +++ b/gcm/tests/health_checks_tests/test_killswitches.py @@ -57,6 +57,7 @@ def _write_to_file(path: Path, data: str) -> Path: "check-service service-status -s sth", "check-service package-version -p sth -v sth", "check-nccl --nccl-tdir=sth -p all_gather --critical-threshold=4", + "check-rccl --rccl-tdir=sth -p all_reduce --critical-threshold=4", "check-nvidia-smi -c gpu_num", "check-nvidia-smi -c running_procs", "check-nvidia-smi -c clock_freq", @@ -111,6 +112,7 @@ def test_killswitches( disable_service_status = true disable_package_version = true disable_nccl_tests = true + disable_rccl_tests = true disable_nvidia_smi_gpu_num = true disable_nvidia_smi_clock_freq = true disable_nvidia_smi_running_procs = true diff --git a/gcm/tests/test_device_telemetry_rocm.py b/gcm/tests/test_device_telemetry_rocm.py new file mode 100644 index 0000000..2435671 --- /dev/null +++ b/gcm/tests/test_device_telemetry_rocm.py @@ -0,0 +1,102 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Tests for ROCm device telemetry client.""" + +import json +from unittest.mock import patch + +import pytest + +from gcm.monitoring.device_telemetry_client import DeviceTelemetryException +from gcm.monitoring.device_telemetry_rocm import ( + ROCmDeviceTelemetryClient, + ROCmGPUDevice, + _run_cmd, +) + + +def test_rocm_client_no_tool_raises() -> None: + """ROCmDeviceTelemetryClient raises when no amd-smi/rocm-smi on PATH.""" + with patch( + "gcm.monitoring.device_telemetry_rocm._find_rocm_tool", + return_value=None, + ): + client = ROCmDeviceTelemetryClient(tool_path=None) + with pytest.raises(DeviceTelemetryException) as exc_info: + client.get_device_count() + assert "No ROCm tool found" in str(exc_info.value) + + +def test_rocm_client_amd_smi_list_json() -> None: + """ROCmDeviceTelemetryClient uses amd-smi list --json for device count.""" + list_out = json.dumps({"gpus": [{"gpu_id": 0}, {"gpu_id": 1}]}) + with patch( + "gcm.monitoring.device_telemetry_rocm._find_rocm_tool", + return_value="amd-smi", + ), patch( + "gcm.monitoring.device_telemetry_rocm._run_cmd", + return_value=list_out, + ) as run_cmd: + client = ROCmDeviceTelemetryClient(tool_path="amd-smi") + count = client.get_device_count() + assert count == 2 + run_cmd.assert_any_call(["amd-smi", "list", "--json"], 30) + + +def test_rocm_gpu_device_defaults() -> None: + """ROCmGPUDevice returns safe defaults for ECC/retired/row_remap.""" + dev = ROCmGPUDevice(0, {}, {}, []) + assert list(dev.get_retired_pages_double_bit_ecc_error()) == [] + assert list(dev.get_retired_pages_multiple_single_bit_ecc_errors()) == [] + assert dev.get_retired_pages_pending_status() == 0 + remap = dev.get_remapped_rows() + assert remap.pending == 0 and remap.failure == 0 + assert dev.get_ecc_uncorrected_volatile_total() == 0 + assert dev.get_ecc_corrected_volatile_total() == 0 + assert "AMD" in dev.get_vbios_version() or dev.get_vbios_version() == "AMD-ROCm" + + +def test_rocm_gpu_device_memory_util() -> None: + """ROCmGPUDevice maps metrics to memory and utilization.""" + metrics = { + "average_gfx_activity": 0.5, + "average_umc_activity": 0.3, + "temperature_hotspot": 72, + "current_gfxclk": 1800, + "current_uclk": 800, + } + memory = {"total": 32 * 1024, "free": 16 * 1024, "used": 16 * 1024} # MB + dev = ROCmGPUDevice(0, metrics, memory, []) + mem = dev.get_memory_info() + assert mem.total >= mem.used + assert mem.used == mem.total - mem.free + util = dev.get_utilization_rates() + assert util.gpu == 50 + assert util.memory == 30 + assert dev.get_temperature() == 72 + clock = dev.get_clock_freq() + assert clock.graphics_freq == 1800 + assert clock.memory_freq == 800 + + +def test_run_cmd_timeout() -> None: + """_run_cmd raises DeviceTelemetryException on timeout.""" + import subprocess + with patch( + "subprocess.run", + side_effect=subprocess.TimeoutExpired("amd-smi", 30), + ): + with pytest.raises(DeviceTelemetryException) as exc_info: + _run_cmd(["amd-smi", "list", "--json"], timeout_secs=30) + assert "timed out" in str(exc_info.value).lower() + + +def test_run_cmd_not_found() -> None: + """_run_cmd raises DeviceTelemetryException when tool not found.""" + with patch( + "subprocess.run", + side_effect=FileNotFoundError("amd-smi not found"), + ): + with pytest.raises(DeviceTelemetryException) as exc_info: + _run_cmd(["amd-smi", "list", "--json"]) + assert "not found" in str(exc_info.value).lower() diff --git a/gcm/tests/test_rocm_monitor.py b/gcm/tests/test_rocm_monitor.py new file mode 100644 index 0000000..d2a25cc --- /dev/null +++ b/gcm/tests/test_rocm_monitor.py @@ -0,0 +1,82 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Tests for rocm_monitor CLI.""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Iterable, List, Mapping + +import pytest +from click.testing import CliRunner + +from gcm.exporters.do_nothing import DoNothing +from gcm.monitoring.cli.rocm_monitor import ( + CliObject, + main, +) +from gcm.monitoring.device_telemetry_client import DeviceTelemetryClient, GPUDevice +from gcm.monitoring.sink.protocol import SinkImpl +from gcm.monitoring.sink.utils import Factory +from gcm.tests.fakes import FakeClock, FakeGPUDevice + + +@dataclass +class FakeTelemetryClient: + devices: List[GPUDevice] = field(default_factory=list) + + def get_device_count(self) -> int: + return len(self.devices) + + def get_device_by_index(self, index: int) -> GPUDevice: + return self.devices[index] + + +@dataclass +class FakeRocmCliObject: + clock: object = field(default_factory=FakeClock) + registry: Mapping[str, Factory[SinkImpl]] = field( + default_factory=lambda: {"do_nothing": DoNothing} + ) + + def get_device_telemetry(self) -> DeviceTelemetryClient: + return FakeTelemetryClient(devices=[FakeGPUDevice()]) + + def read_env(self, process_id: int) -> Dict[str, str]: + return {} + + def get_ram_utilization(self) -> float: + return 0.5 + + def get_hostname(self) -> str: + return "testhost" + + def format_epilog(self) -> str: + return "" + + def looptimes(self, once: bool) -> Iterable[int]: + return range(1) + + +def test_rocm_monitor_once(tmp_path: Path) -> None: + """rocm_monitor --once runs one collection cycle and exits 0.""" + runner = CliRunner(mix_stderr=False) + fake_obj: CliObject = FakeRocmCliObject() + result = runner.invoke( + main, + [ + f"--log-folder={tmp_path}", + "--collect-interval=1", + "--push-interval=2", + "--sink", + "do_nothing", + "--stdout", + "--once", + "--log-level=DEBUG", + ], + obj=fake_obj, + catch_exceptions=False, + ) + assert result.exit_code == 0 + # One line per device (1) + one for host metrics + lines = [l for l in result.stdout.strip().split("\n") if l.strip()] + assert len(lines) >= 1 diff --git a/pyproject.toml b/pyproject.toml index b16eefe..842f566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ license = { text = "MIT" } Homepage = "https://github.com/facebookresearch/gcm" [project.optional-dependencies] +rocm = [] # AMD/ROCm: amd-smi or rocm-smi on PATH; no extra Python deps dev = [ "parameterized", "virtualenv>=20.28.1", diff --git a/systemd/rocm/fair_cluster_rocm_resources.slice b/systemd/rocm/fair_cluster_rocm_resources.slice new file mode 100644 index 0000000..4692c58 --- /dev/null +++ b/systemd/rocm/fair_cluster_rocm_resources.slice @@ -0,0 +1,10 @@ +[Unit] +Description=Slice for rocm_monitor + +[Slice] +MemoryHigh=700M +MemoryMax=1G +CPUQuota=100% +IOWeight=100 +IODeviceWeight=/dev/sda2 100 +StartupDeviceWeight=/dev/sda2 100 diff --git a/systemd/rocm/rocm_monitor.service b/systemd/rocm/rocm_monitor.service new file mode 100644 index 0000000..e6d483b --- /dev/null +++ b/systemd/rocm/rocm_monitor.service @@ -0,0 +1,13 @@ +[Unit] +Description=Collect and publish AMD GPU (ROCm) metrics. +After=network.target + +[Service] +Type=simple +User=cluster_monitor +ExecStart=/usr/bin/gcm rocm_monitor +Restart=on-failure +RestartSec=300 +Slice=fair_cluster_rocm_resources.slice +# penguin does network rate limiting on the file below +ExecStartPre=sudo /usr/local/sbin/hc_network.sh diff --git a/website/docs/GCM_Monitoring/collectors/README.md b/website/docs/GCM_Monitoring/collectors/README.md index 8551384..31692d7 100644 --- a/website/docs/GCM_Monitoring/collectors/README.md +++ b/website/docs/GCM_Monitoring/collectors/README.md @@ -5,6 +5,7 @@ This directory contains documentation for all GCM monitoring collectors. Collect ## Collectors - **[nvml_monitor](nvml_monitor.md)** - Collects GPU metrics using NVIDIA NVML library +- **[rocm_monitor](rocm_monitor.md)** - Collects GPU metrics from AMD GPUs via amd-smi/rocm-smi (ROCm) - **[sacct_backfill](sacct_backfill.md)** - Backfills historical job data in time-chunked batches - **[sacct_backfill_server](sacct_backfill_server.md)** - Coordination server for multi-cluster backfills - **[sacct_publish](sacct_publish.md)** - Transforms and publishes sacct output to sinks diff --git a/website/docs/GCM_Monitoring/collectors/rocm_monitor.md b/website/docs/GCM_Monitoring/collectors/rocm_monitor.md new file mode 100644 index 0000000..80a2293 --- /dev/null +++ b/website/docs/GCM_Monitoring/collectors/rocm_monitor.md @@ -0,0 +1,38 @@ +# rocm_monitor + +## Overview +Collects GPU metrics from **AMD GPUs** using the ROCm stack (`amd-smi` or `rocm-smi`) and publishes aggregated metrics at regular intervals. Provides real-time monitoring of GPU utilization, memory usage, power consumption, temperature, SLURM job information, and host-level metrics including RAM utilization. The schema matches `nvml_monitor` for consistency across NVIDIA and AMD deployments. + +**Requirements**: `amd-smi` (preferred) or `rocm-smi` must be installed on the node and on PATH. See [AMD SMI](https://rocm.docs.amd.com/projects/amdsmi/). + +**Data Type**: `DataType.LOG`, **Schemas**: `DevicePlusJobMetrics` + +**Data Type**: `DataType.METRIC`, **Schemas**: `HostMetrics`, `IndexedDeviceMetrics` + +## Execution Scope + +All AMD GPU nodes in the cluster. Use `gcm nvml_monitor` on NVIDIA nodes and `gcm rocm_monitor` on AMD nodes (e.g. via systemd or scheduler). + +## Environment Variables + +- **ROCR_VISIBLE_DEVICES**: Optional. Comma-separated GPU indices visible to the process (ROCm analogue of `CUDA_VISIBLE_DEVICES`). If unset, all GPUs are visible. +- **SLURM_JOB_GPUS**: Used by job attribution when present in the process environment. + +## Command-Line Options + +Same as [nvml_monitor](nvml_monitor.md): `--collect-interval`, `--push-interval`, `--interval`, `--cluster`, `--sink`, `--sink-opts`, `--log-level`, `--log-folder`, `--stdout`, `--heterogeneous-cluster-v1`, `--once`, etc. + +## Usage Examples + +### Basic Continuous Monitoring +```bash +gcm rocm_monitor --sink file --sink-opts filepath=/tmp/amd_gpu_metrics.json +``` + +### One-Time Collection +```bash +gcm rocm_monitor --once --sink stdout +``` + +### systemd (AMD nodes) +Use the provided unit `systemd/rocm/rocm_monitor.service` and slice `systemd/rocm/fair_cluster_rocm_resources.slice` on AMD GPU nodes.