diff --git a/.github/workflows/gcm_python.yml b/.github/workflows/gcm_python.yml index 264aa40..40d6c06 100644 --- a/.github/workflows/gcm_python.yml +++ b/.github/workflows/gcm_python.yml @@ -63,11 +63,11 @@ jobs: with: path: ~/.cache/venv-ci key: ${{ env.pythonLocation }}-${{ hashFiles('dev-requirements.txt') }} - - name: Install Rust run: | curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y - cp -r $HOME/.cargo/bin/* $HOME/.cache/venv-ci/bin/ + source $HOME/.cargo/env + cp -r $HOME/.cargo/bin/. $HOME/.cache/venv-ci/bin/ - name: Install build dependencies run: | diff --git a/README.md b/README.md index 7c8dec6..9404419 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Facebook has adopted a Code of Conduct that we expect project participants to ad ## The Team -GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Gustavo Lima](https://github.com/gustcol). +GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Achintya Paningapalli](https://github.com/theap06), [Gustavo Lima](https://github.com/gustcol). Feel free to contribute and add your name! diff --git a/dev-requirements.txt b/dev-requirements.txt index 31d7a1b..2897bb5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -183,6 +183,7 @@ keyring==25.7.0 \ --hash=sha256:fe01bd85eb3f8fb3dd0405defdeac9a5b4f6f0439edbb3149577f244a2e8245b # via twine libcst==1.8.1 \ + --hash=sha256:a748502a2ef57834e7f135a51392dc6a431f2d4547f8b2917cd8e27c91d30c61 \ --hash=sha256:423427819409a1d905017bbd51062bd0f1e4795c74c2f9f52a6b63dd67c282d2 \ --hash=sha256:bdad73ce302741354abd2d0ac54add8bbbffb123a176629f65ce16e0dff012f6 # via @@ -382,6 +383,7 @@ pynvml==11.4.1 \ --hash=sha256:d27be542cd9d06558de18e2deffc8022ccd7355bc7382255d477038e7e424c6c # via gcm (pyproject.toml) pyoxidizer==0.24.0 \ + --hash=sha256:1a9940d2bdb6c9e6c6c45eb4de3d4d6e5c9b3a3724dbc7d774d85d4956058447 \ --hash=sha256:ec56f2b99495aa0178e927389a3e151e9669beae4e2bce3f6897fb9891b5502e # via gcm (pyproject.toml) pyproject-hooks==1.2.0 \ diff --git a/gcm/health_checks/cli/health_checks.py b/gcm/health_checks/cli/health_checks.py index 44b8660..f29535c 100644 --- a/gcm/health_checks/cli/health_checks.py +++ b/gcm/health_checks/cli/health_checks.py @@ -26,9 +26,19 @@ @feature_flags_config(FeatureValueHealthChecksFeatures) @toml_config_option("health_checks", default_config_path=DEFAULT_CONFIG_PATH) @detach_option +@click.option( + "--backend", + type=click.Choice(["nvml"]), + default="nvml", + show_default=True, + help="Accelerator backend used by GPU health checks.", +) @click.version_option(__version__) -def health_checks(detach: bool) -> None: +def health_checks(detach: bool, backend: str) -> None: """GPU Cluster Monitoring: Large-Scale AI Research Cluster Monitoring.""" + ctx = click.get_current_context() + if isinstance(ctx.obj, dict): + ctx.obj["accelerator_backend"] = backend list_of_checks: List[click.core.Command] = [ diff --git a/gcm/monitoring/accelerator/README.md b/gcm/monitoring/accelerator/README.md new file mode 100644 index 0000000..43a5b64 --- /dev/null +++ b/gcm/monitoring/accelerator/README.md @@ -0,0 +1,94 @@ +# Accelerator HAL (Python) + +This package provides a hardware-agnostic accelerator abstraction for a +Python-first observability codebase. + +## Layout + +```text +gcm/monitoring/accelerator/ + backend.py # core interfaces and identity models + metrics.py # normalized metrics and capability model + errors.py # typed errors for backend operations + manager.py # backend orchestration and routing + probe.py # dynamic shared library probe helpers + registry.py # default backend registration + backends/ + nvml.py +``` + +## Design notes + +- Backends are discovered and probed at runtime; missing drivers degrade + gracefully. +- Metric output uses a single normalized `MetricSet` type. +- Optional vendor fields remain `None` unless supported by backend capability. +- This design can be implemented directly in Python or backed by Rust/C++ + worker processes behind the same backend protocol. + +## Lifecycle + +1. Build an `AcceleratorManager` from `default_backend_factories()`. +2. Call `probe_all()` to initialize and retain healthy backends. +3. Call `refresh_devices()` to enumerate backend devices and cache handles. +4. Call `read_all_metrics()` with a `MetricRequest` during each collection loop. +5. Call `close()` on shutdown. + +## Backend authoring guide + +- Implement `AcceleratorBackend` methods in `backends/.py`. +- `probe()` should only verify runtime readiness and return a clear reason on + failure. +- `enumerate_devices()` should return stable, backend-scoped `DeviceHandle.id` + values. +- `read_metrics()` should map into normalized `MetricSet` fields and avoid + failing the full read when a single metric is unavailable. +- Keep unsupported fields as `None` and gate behavior through `CapabilitySet`. + +## Scope in this PR + +- Includes a functional NVML backend only. +- Keeps the HAL contract/manager generic so additional backends can be added in + follow-up PRs. + +## Migration note + +- HAL behavior is Python-first to simplify integration and testability. +- If needed later, vendor-specific FFI logic can move into Rust/C++ sidecar + workers without changing the Python HAL interface. + +## Test plan + +### Full-run commands (with output) + +**gcm** (single collection, stdout sink): + +```bash +gcm --backend=nvml nvml_monitor --sink=stdout --once --log-folder=/tmp/gcm-log +``` + +Example output (with NVIDIA GPUs present): + +```json +[{"gpu_id": 0, "hostname": "node01", "mem_util": 45, "gpu_util": 32, ...}] +[{"gpu_index": 0, "max_gpu_util": 32, "min_gpu_util": 28, ...}] +``` + +Without GPUs: exits with `DeviceTelemetryException` / NVML not found. + +**health_checks** (nvidia-smi gpu_num check, stdout sink): + +```bash +health_checks --backend=nvml check-nvidia-smi fair_cluster nagios --sink=stdout -c gpu_num --gpu_num=0 +``` + +Example output: + +```json +[{"node": "node01", "cluster": "fair_cluster", "health_check": "nvidia smi", "type": "nagios", "result": 0, "_msg": "Number of GPUs present is the same as expected, 0", ...}] +``` + +### Automated tests + +- `pytest -q gcm/tests/test_accelerator_hal.py` +- `pytest -q gcm/tests/test_gcm.py -k "backend or full_run"` diff --git a/gcm/monitoring/accelerator/__init__.py b/gcm/monitoring/accelerator/__init__.py new file mode 100644 index 0000000..9a2525a --- /dev/null +++ b/gcm/monitoring/accelerator/__init__.py @@ -0,0 +1,37 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from gcm.monitoring.accelerator.backend import ( + AcceleratorBackend, + BackendName, + DeviceHandle, + ProbeResult, +) +from gcm.monitoring.accelerator.errors import ( + AcceleratorError, + BackendUnavailableError, + UnsupportedOperationError, +) +from gcm.monitoring.accelerator.manager import AcceleratorManager +from gcm.monitoring.accelerator.metrics import ( + Capability, + CapabilitySet, + MetricRequest, + MetricSet, +) +from gcm.monitoring.accelerator.registry import default_backend_factories + +__all__ = [ + "AcceleratorBackend", + "AcceleratorError", + "AcceleratorManager", + "BackendName", + "BackendUnavailableError", + "Capability", + "CapabilitySet", + "DeviceHandle", + "MetricRequest", + "MetricSet", + "ProbeResult", + "UnsupportedOperationError", + "default_backend_factories", +] diff --git a/gcm/monitoring/accelerator/backend.py b/gcm/monitoring/accelerator/backend.py new file mode 100644 index 0000000..464cc06 --- /dev/null +++ b/gcm/monitoring/accelerator/backend.py @@ -0,0 +1,51 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Callable, List, Protocol + +from gcm.monitoring.accelerator.metrics import CapabilitySet, MetricRequest, MetricSet + + +class BackendName(str, Enum): + NVML = "nvml" + + +@dataclass(frozen=True) +class ProbeResult: + backend: BackendName + healthy: bool + reason: str + library_path: str | None = None + driver_version: str | None = None + probed_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + +@dataclass(frozen=True) +class DeviceHandle: + backend: BackendName + id: str + vendor: str + model: str | None = None + bus_id: str | None = None + serial: str | None = None + + +class AcceleratorBackend(Protocol): + def name(self) -> BackendName: ... + + def probe(self) -> ProbeResult: ... + + def enumerate_devices(self) -> List[DeviceHandle]: ... + + def capabilities(self, device: DeviceHandle) -> CapabilitySet: ... + + def read_metrics( + self, device: DeviceHandle, request: MetricRequest + ) -> MetricSet: ... + + def close(self) -> None: ... + + +BackendFactory = Callable[[], AcceleratorBackend] diff --git a/gcm/monitoring/accelerator/backends/__init__.py b/gcm/monitoring/accelerator/backends/__init__.py new file mode 100644 index 0000000..ae1b0cf --- /dev/null +++ b/gcm/monitoring/accelerator/backends/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. diff --git a/gcm/monitoring/accelerator/backends/nvml.py b/gcm/monitoring/accelerator/backends/nvml.py new file mode 100644 index 0000000..2c2ba47 --- /dev/null +++ b/gcm/monitoring/accelerator/backends/nvml.py @@ -0,0 +1,181 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Callable, Optional, TypeVar + +from gcm.monitoring.accelerator.backend import BackendName, DeviceHandle, ProbeResult +from gcm.monitoring.accelerator.errors import ( + BackendUnavailableError, + UnsupportedOperationError, +) +from gcm.monitoring.accelerator.metrics import ( + Capability, + CapabilitySet, + MetricRequest, + MetricSet, +) +from gcm.monitoring.accelerator.probe import find_and_load_library +from gcm.monitoring.device_telemetry_client import ( + DeviceTelemetryClient, + DeviceTelemetryException, +) +from gcm.schemas.gpu.application_clock import ApplicationClockInfo +from gcm.schemas.gpu.memory import GPUMemory +from gcm.schemas.gpu.utilization import GPUUtilization + +_NAMES = ["nvidia-ml"] +_PATHS = [ + "/usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1", + "/usr/lib64/libnvidia-ml.so.1", + "/usr/lib/libnvidia-ml.so.1", +] + +_T = TypeVar("_T") + + +def _default_nvml_client_factory() -> DeviceTelemetryClient: + # Keep the import lazy so this package can still be imported in + # environments where pynvml is unavailable. + from gcm.monitoring.device_telemetry_nvml import NVMLDeviceTelemetryClient + + return NVMLDeviceTelemetryClient() + + +@dataclass +class NVMLBackend: + telemetry_client_factory: Callable[[], DeviceTelemetryClient] = ( + _default_nvml_client_factory + ) + _client: Optional[DeviceTelemetryClient] = field( + default=None, init=False, repr=False + ) + + def name(self) -> BackendName: + return BackendName.NVML + + def _ensure_client(self) -> DeviceTelemetryClient: + if self._client is None: + self._client = self.telemetry_client_factory() + return self._client + + def probe(self) -> ProbeResult: + path = find_and_load_library(_NAMES, _PATHS) + if path is None: + raise BackendUnavailableError("NVML shared library not found") + client = self._ensure_client() + try: + client.get_device_count() + except DeviceTelemetryException as e: + raise BackendUnavailableError("NVML initialization failed") from e + return ProbeResult( + backend=self.name(), + healthy=True, + reason="ready", + library_path=path, + probed_at=datetime.now(timezone.utc), + ) + + def enumerate_devices(self) -> list[DeviceHandle]: + client = self._ensure_client() + try: + device_count = client.get_device_count() + devices: list[DeviceHandle] = [] + for index in range(device_count): + model: Optional[str] = None + handle = client.get_device_by_index(index) + model_getter = getattr(handle, "get_name", None) + if callable(model_getter): + maybe_model = self._safe_call(model_getter) + if isinstance(maybe_model, str): + model = maybe_model + devices.append( + DeviceHandle( + backend=self.name(), + id=str(index), + vendor="nvidia", + model=model, + ) + ) + return devices + except DeviceTelemetryException as e: + raise UnsupportedOperationError("NVML enumerate_devices failed") from e + + def capabilities(self, _device: DeviceHandle) -> CapabilitySet: + return CapabilitySet( + values={ + Capability.UTILIZATION, + Capability.MEMORY, + Capability.POWER, + Capability.THERMALS, + Capability.CLOCKS, + Capability.ECC, + Capability.PROCESSES, + } + ) + + @staticmethod + def _safe_call(func: Callable[[], _T]) -> _T | None: + try: + return func() + except DeviceTelemetryException: + return None + + def read_metrics(self, device: DeviceHandle, _request: MetricRequest) -> MetricSet: + # TODO: Wire MetricRequest.include_process_info once process telemetry + # is available through HAL MetricSet. + client = self._ensure_client() + try: + index = int(device.id) + handle = client.get_device_by_index(index) + except (ValueError, DeviceTelemetryException) as e: + raise UnsupportedOperationError( + f"invalid NVML device id: {device.id}" + ) from e + + utilization: GPUUtilization | None = self._safe_call( + handle.get_utilization_rates + ) + memory: GPUMemory | None = self._safe_call(handle.get_memory_info) + temperature: int | None = self._safe_call(handle.get_temperature) + power_usage: int | None = self._safe_call(handle.get_power_usage) + power_limit: int | None = self._safe_call(handle.get_enforced_power_limit) + clocks: ApplicationClockInfo | None = self._safe_call(handle.get_clock_freq) + ecc_corrected: int | None = self._safe_call( + handle.get_ecc_corrected_volatile_total + ) + ecc_uncorrected: int | None = self._safe_call( + handle.get_ecc_uncorrected_volatile_total + ) + + return MetricSet( + timestamp=datetime.now(timezone.utc), + core_util_pct=(float(utilization.gpu) if utilization is not None else None), + mem_util_pct=( + float(utilization.memory) if utilization is not None else None + ), + mem_total_bytes=(int(memory.total) if memory is not None else None), + mem_used_bytes=(int(memory.used) if memory is not None else None), + temp_c=(float(temperature) if temperature is not None else None), + power_w=(float(power_usage) / 1000.0 if power_usage is not None else None), + power_limit_w=( + float(power_limit) / 1000.0 if power_limit is not None else None + ), + sm_clock_mhz=(int(clocks.graphics_freq) if clocks is not None else None), + mem_clock_mhz=(int(clocks.memory_freq) if clocks is not None else None), + ecc_corrected=(int(ecc_corrected) if ecc_corrected is not None else None), + ecc_uncorrected=( + int(ecc_uncorrected) if ecc_uncorrected is not None else None + ), + ) + + def close(self) -> None: + client = self._client + self._client = None + if client is None: + return None + + close_method = getattr(client, "close", None) + if callable(close_method): + close_method() + return None diff --git a/gcm/monitoring/accelerator/errors.py b/gcm/monitoring/accelerator/errors.py new file mode 100644 index 0000000..bc249fd --- /dev/null +++ b/gcm/monitoring/accelerator/errors.py @@ -0,0 +1,26 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass + +from gcm.monitoring.accelerator.backend import BackendName + + +class AcceleratorError(Exception): + """Base exception type for accelerator HAL failures.""" + + +class BackendUnavailableError(AcceleratorError): + """Raised when backend probe fails due to missing runtime dependencies.""" + + +class UnsupportedOperationError(AcceleratorError): + """Raised when an operation is not implemented by a backend.""" + + +@dataclass(frozen=True) +class BackendOperationError(AcceleratorError): + backend: BackendName + operation: str + + def __str__(self) -> str: + return f"backend={self.backend.value} operation={self.operation}" diff --git a/gcm/monitoring/accelerator/manager.py b/gcm/monitoring/accelerator/manager.py new file mode 100644 index 0000000..9921412 --- /dev/null +++ b/gcm/monitoring/accelerator/manager.py @@ -0,0 +1,80 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from gcm.monitoring.accelerator.backend import ( + AcceleratorBackend, + BackendFactory, + BackendName, + DeviceHandle, + ProbeResult, +) +from gcm.monitoring.accelerator.errors import BackendOperationError +from gcm.monitoring.accelerator.metrics import MetricRequest, MetricSet + + +class AcceleratorManager: + def __init__(self, factories: dict[BackendName, BackendFactory]) -> None: + self._factories = dict(factories) + self._backends: dict[BackendName, AcceleratorBackend] = {} + self._devices: dict[str, DeviceHandle] = {} + + def probe_all(self) -> dict[BackendName, ProbeResult]: + # Reset previously active backends so reprobe can refresh state. + self.close() + results: dict[BackendName, ProbeResult] = {} + for name, factory in self._factories.items(): + backend = factory() + try: + result = backend.probe() + except Exception as e: + results[name] = ProbeResult(backend=name, healthy=False, reason=str(e)) + backend.close() + continue + + results[name] = result + if result.healthy: + self._backends[name] = backend + else: + backend.close() + return results + + def refresh_devices(self) -> None: + next_devices: dict[str, DeviceHandle] = {} + for name, backend in self._backends.items(): + try: + devices = backend.enumerate_devices() + except Exception as e: + raise BackendOperationError( + backend=name, + operation="enumerate_devices", + ) from e + for device in devices: + key = f"{device.backend.value}/{device.id}" + next_devices[key] = device + self._devices = next_devices + + def devices(self) -> list[DeviceHandle]: + return list(self._devices.values()) + + def get_backend(self, name: BackendName) -> AcceleratorBackend | None: + return self._backends.get(name) + + def read_all_metrics(self, request: MetricRequest) -> dict[str, MetricSet]: + results: dict[str, MetricSet] = {} + for key, device in self._devices.items(): + backend = self._backends.get(device.backend) + if backend is None: + continue + try: + results[key] = backend.read_metrics(device, request) + except Exception as e: + raise BackendOperationError( + backend=device.backend, + operation="read_metrics", + ) from e + return results + + def close(self) -> None: + for backend in self._backends.values(): + backend.close() + self._backends = {} + self._devices = {} diff --git a/gcm/monitoring/accelerator/metrics.py b/gcm/monitoring/accelerator/metrics.py new file mode 100644 index 0000000..ce82b2a --- /dev/null +++ b/gcm/monitoring/accelerator/metrics.py @@ -0,0 +1,50 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum + + +class Capability(str, Enum): + UTILIZATION = "utilization" + MEMORY = "memory" + POWER = "power" + THERMALS = "thermals" + CLOCKS = "clocks" + ECC = "ecc" + TOPOLOGY = "topology" + PROCESSES = "processes" + + +@dataclass(frozen=True) +class CapabilitySet: + values: set[Capability] + + def supports(self, capability: Capability) -> bool: + return capability in self.values + + +@dataclass(frozen=True) +class MetricRequest: + include_process_info: bool = False + + +@dataclass(frozen=True) +class MetricSet: + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + core_util_pct: float | None = None + mem_util_pct: float | None = None + + mem_total_bytes: int | None = None + mem_used_bytes: int | None = None + + temp_c: float | None = None + power_w: float | None = None + power_limit_w: float | None = None + + sm_clock_mhz: int | None = None + mem_clock_mhz: int | None = None + + ecc_corrected: int | None = None + ecc_uncorrected: int | None = None diff --git a/gcm/monitoring/accelerator/probe.py b/gcm/monitoring/accelerator/probe.py new file mode 100644 index 0000000..4d58001 --- /dev/null +++ b/gcm/monitoring/accelerator/probe.py @@ -0,0 +1,26 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from ctypes import CDLL +from ctypes.util import find_library + + +def first_existing_library(candidates: list[str]) -> str | None: + for path in candidates: + try: + CDLL(path) + return path + except OSError: + continue + return None + + +def find_and_load_library(names: list[str], path_candidates: list[str]) -> str | None: + for name in names: + discovered = find_library(name) + if discovered is not None: + try: + CDLL(discovered) + return discovered + except OSError: + continue + return first_existing_library(path_candidates) diff --git a/gcm/monitoring/accelerator/registry.py b/gcm/monitoring/accelerator/registry.py new file mode 100644 index 0000000..b56b0d0 --- /dev/null +++ b/gcm/monitoring/accelerator/registry.py @@ -0,0 +1,10 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from gcm.monitoring.accelerator.backend import BackendFactory, BackendName +from gcm.monitoring.accelerator.backends.nvml import NVMLBackend + + +def default_backend_factories() -> dict[BackendName, BackendFactory]: + return { + BackendName.NVML: NVMLBackend, + } diff --git a/gcm/monitoring/cli/gcm.py b/gcm/monitoring/cli/gcm.py index f466b89..ecdaeb3 100644 --- a/gcm/monitoring/cli/gcm.py +++ b/gcm/monitoring/cli/gcm.py @@ -30,9 +30,20 @@ @click.group(cls=DaemonGroup, epilog=f"GCM Version: {__version__}") @toml_config_option("gcm") @detach_option +@click.option( + "--backend", + type=click.Choice(["nvml"]), + default="nvml", + show_default=True, + help="Accelerator backend used by GPU telemetry paths.", +) @click.version_option(__version__) -def main(detach: bool) -> None: +def main(detach: bool, backend: str) -> None: """GPU cluster monitoring. A toolkit for HPC cluster telemetry and health checks.""" + ctx = click.get_current_context() + if not isinstance(ctx.obj, dict): + ctx.obj = {} + ctx.obj["accelerator_backend"] = backend main.add_command(nvml_monitor.main, name="nvml_monitor") diff --git a/gcm/monitoring/cli/nvml_monitor.py b/gcm/monitoring/cli/nvml_monitor.py index 7e4a5e3..73067fc 100644 --- a/gcm/monitoring/cli/nvml_monitor.py +++ b/gcm/monitoring/cli/nvml_monitor.py @@ -26,6 +26,9 @@ import click from gcm.exporters import registry +from gcm.monitoring.accelerator.backend import BackendName +from gcm.monitoring.accelerator.manager import AcceleratorManager +from gcm.monitoring.accelerator.registry import default_backend_factories from gcm.monitoring.accumulate import Accumulator from gcm.monitoring.click import ( click_default_cmd, @@ -71,6 +74,49 @@ logger: logging.Logger # initialization in main() +def _selected_backend_name() -> BackendName: + ctx = click.get_current_context(silent=True) + if ctx is None: + return BackendName.NVML + + root = ctx.find_root() + if isinstance(root.obj, dict): + backend_from_obj = root.obj.get("accelerator_backend") + if isinstance(backend_from_obj, str): + try: + return BackendName(backend_from_obj) + except ValueError: + pass + + backend_from_params = root.params.get("backend") + if isinstance(backend_from_params, str): + try: + return BackendName(backend_from_params) + except ValueError: + pass + + return BackendName.NVML + + +def _probe_selected_backend(backend_name: BackendName) -> None: + factories = default_backend_factories() + selected_factory = factories.get(backend_name) + if selected_factory is None: + raise click.ClickException(f"Unsupported accelerator backend: {backend_name}") + + manager = AcceleratorManager(factories={backend_name: selected_factory}) + try: + results = manager.probe_all() + result = results.get(backend_name) + if result is None or not result.healthy: + reason = result.reason if result is not None else "no probe result" + raise click.ClickException( + f"Accelerator backend '{backend_name.value}' is unavailable: {reason}" + ) + finally: + manager.close() + + def get_device_metrics_basic(handle: GPUDevice) -> DeviceMetrics: """Retrieve the device metrics.""" metrics = DeviceMetrics(mem_used_percent=-1) @@ -315,6 +361,9 @@ def main( """Script for reading gpu metrics on the node.""" global logger + if obj is _default_obj: + _probe_selected_backend(_selected_backend_name()) + device_telemetry = obj.get_device_telemetry() device_count = device_telemetry.get_device_count() diff --git a/gcm/monitoring/device_telemetry_nvml.py b/gcm/monitoring/device_telemetry_nvml.py index 73d63fc..c709b9d 100644 --- a/gcm/monitoring/device_telemetry_nvml.py +++ b/gcm/monitoring/device_telemetry_nvml.py @@ -111,6 +111,10 @@ def get_utilization_rates(self) -> GPUUtilization: def get_vbios_version(self) -> str: return pynvml.nvmlDeviceGetVbiosVersion(self.handle) + @pynvml_exception_handler + def get_name(self) -> str: + return str(pynvml.nvmlDeviceGetName(self.handle)) + @pynvml_exception_handler def get_clock_freq(self) -> ApplicationClockInfo: # For the type parameter https://github.com/gpuopenanalytics/pynvml/blob/41e1657948b18008d302f5cb8af06539adc7c792/pynvml/nvml.py#L168 @@ -137,3 +141,7 @@ def get_device_count(self) -> int: def get_device_by_index(self, index: int) -> NVMLGPUDevice: device = pynvml.nvmlDeviceGetHandleByIndex(index) return NVMLGPUDevice(device) + + @pynvml_exception_handler + def close(self) -> None: + pynvml.nvmlShutdown() diff --git a/gcm/tests/health_checks_tests/test_health_checks.py b/gcm/tests/health_checks_tests/test_health_checks.py index c238464..9972a71 100644 --- a/gcm/tests/health_checks_tests/test_health_checks.py +++ b/gcm/tests/health_checks_tests/test_health_checks.py @@ -12,3 +12,14 @@ def test_cli(command: str) -> None: result = runner.invoke(hc_main, [command, "--help"], catch_exceptions=False) assert result.stdout.strip() != "" + + +def test_backend_option_is_accepted() -> None: + runner = CliRunner() + result = runner.invoke( + hc_main, + ["--backend", "nvml", "check-nvidia-smi", "--help"], + catch_exceptions=False, + ) + assert result.exit_code == 0 + assert "--sink" in result.stdout diff --git a/gcm/tests/test_accelerator_hal.py b/gcm/tests/test_accelerator_hal.py new file mode 100644 index 0000000..a41ae88 --- /dev/null +++ b/gcm/tests/test_accelerator_hal.py @@ -0,0 +1,347 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +from dataclasses import dataclass +from typing import cast + +import pytest + +from gcm.monitoring.accelerator.backend import BackendName, DeviceHandle, ProbeResult +from gcm.monitoring.accelerator.backends.nvml import NVMLBackend +from gcm.monitoring.accelerator.errors import ( + BackendOperationError, + BackendUnavailableError, + UnsupportedOperationError, +) +from gcm.monitoring.accelerator.manager import AcceleratorManager +from gcm.monitoring.accelerator.metrics import Capability, MetricRequest +from gcm.monitoring.accelerator.probe import find_and_load_library +from gcm.monitoring.accelerator.registry import default_backend_factories +from gcm.monitoring.device_telemetry_client import ( + DeviceTelemetryClient, + DeviceTelemetryException, +) +from gcm.schemas.gpu.application_clock import ApplicationClockInfo +from gcm.schemas.gpu.memory import GPUMemory +from gcm.schemas.gpu.utilization import GPUUtilization + + +@dataclass +class _FakeGPUDevice: + def get_name(self) -> str: + return "NVIDIA H100" + + def get_vbios_version(self) -> str: + return "vbios-1.2.3" + + def get_utilization_rates(self) -> GPUUtilization: + return GPUUtilization(gpu=73, memory=42) + + def get_memory_info(self) -> GPUMemory: + return GPUMemory(total=1000, free=400, used=600) + + def get_temperature(self) -> int: + return 67 + + def get_power_usage(self) -> int: + return 250000 + + def get_enforced_power_limit(self) -> int: + return 300000 + + def get_clock_freq(self) -> ApplicationClockInfo: + return ApplicationClockInfo(graphics_freq=1200, memory_freq=1500) + + def get_ecc_corrected_volatile_total(self) -> int: + return 11 + + def get_ecc_uncorrected_volatile_total(self) -> int: + return 2 + + +class _FakeTelemetryClient: + def __init__(self) -> None: + self.closed = False + + def get_device_count(self) -> int: + return 2 + + def get_device_by_index(self, index: int) -> _FakeGPUDevice: + del index + return _FakeGPUDevice() + + def close(self) -> None: + self.closed = True + + +@dataclass +class _FailingFieldGPUDevice(_FakeGPUDevice): + def get_temperature(self) -> int: + raise DeviceTelemetryException() + + +class _PartialFailureTelemetryClient(_FakeTelemetryClient): + def get_device_by_index(self, index: int) -> _FailingFieldGPUDevice: + del index + return _FailingFieldGPUDevice() + + +def test_nvml_backend_probe_and_read_metrics(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.backends.nvml.find_and_load_library", + lambda names, paths: "/usr/lib/libnvidia-ml.so.1", + ) + backend = NVMLBackend( + telemetry_client_factory=lambda: cast( + DeviceTelemetryClient, _FakeTelemetryClient() + ) + ) + + probe_result = backend.probe() + assert probe_result.healthy is True + assert probe_result.library_path == "/usr/lib/libnvidia-ml.so.1" + + devices = backend.enumerate_devices() + assert len(devices) == 2 + assert devices[0].backend == BackendName.NVML + assert devices[0].vendor == "nvidia" + assert devices[0].model == "NVIDIA H100" + + metrics = backend.read_metrics(devices[0], MetricRequest()) + assert metrics.core_util_pct == 73.0 + assert metrics.mem_util_pct == 42.0 + assert metrics.mem_total_bytes == 1000 + assert metrics.mem_used_bytes == 600 + assert metrics.power_w == 250.0 + assert metrics.power_limit_w == 300.0 + assert metrics.sm_clock_mhz == 1200 + assert metrics.mem_clock_mhz == 1500 + assert metrics.ecc_corrected == 11 + assert metrics.ecc_uncorrected == 2 + + capabilities = backend.capabilities(devices[0]) + assert capabilities.supports(Capability.ECC) + + +def test_nvml_backend_invalid_device_id(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.backends.nvml.find_and_load_library", + lambda names, paths: "/usr/lib/libnvidia-ml.so.1", + ) + backend = NVMLBackend( + telemetry_client_factory=lambda: cast( + DeviceTelemetryClient, _FakeTelemetryClient() + ) + ) + backend.probe() + + with pytest.raises(UnsupportedOperationError): + backend.read_metrics( + DeviceHandle(backend=BackendName.NVML, id="not-an-int", vendor="nvidia"), + MetricRequest(), + ) + + +def test_nvml_backend_partial_failure_yields_partial_metrics( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.backends.nvml.find_and_load_library", + lambda names, paths: "/usr/lib/libnvidia-ml.so.1", + ) + backend = NVMLBackend( + telemetry_client_factory=lambda: cast( + DeviceTelemetryClient, _PartialFailureTelemetryClient() + ) + ) + backend.probe() + device = backend.enumerate_devices()[0] + + metrics = backend.read_metrics(device, MetricRequest()) + assert metrics.core_util_pct == 73.0 + assert metrics.mem_total_bytes == 1000 + # Temperature call fails in fake device, but other fields still map. + assert metrics.temp_c is None + assert metrics.power_w == 250.0 + + +def test_nvml_backend_probe_missing_library(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.backends.nvml.find_and_load_library", + lambda names, paths: None, + ) + backend = NVMLBackend( + telemetry_client_factory=lambda: cast( + DeviceTelemetryClient, _FakeTelemetryClient() + ) + ) + with pytest.raises(BackendUnavailableError): + backend.probe() + + +def test_nvml_backend_close_closes_underlying_client( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.backends.nvml.find_and_load_library", + lambda names, paths: "/usr/lib/libnvidia-ml.so.1", + ) + client = _FakeTelemetryClient() + backend = NVMLBackend( + telemetry_client_factory=lambda: cast(DeviceTelemetryClient, client) + ) + + backend.probe() + backend.close() + + assert client.closed is True + + +class _FakeBackend: + def name(self) -> BackendName: + return BackendName.NVML + + def probe(self) -> ProbeResult: + return ProbeResult(backend=BackendName.NVML, healthy=True, reason="ok") + + def enumerate_devices(self) -> list[DeviceHandle]: + return [DeviceHandle(backend=BackendName.NVML, id="0", vendor="nvidia")] + + def capabilities(self, device: DeviceHandle): # type: ignore[no-untyped-def] + del device + return None + + def read_metrics(self, device: DeviceHandle, request: MetricRequest): # type: ignore[no-untyped-def] + del device, request + # Intentionally sparse to validate manager routing only. + from gcm.monitoring.accelerator.metrics import MetricSet + + return MetricSet() + + def close(self) -> None: + return None + + +def test_manager_probes_refreshes_and_reads() -> None: + manager = AcceleratorManager(factories={BackendName.NVML: lambda: _FakeBackend()}) + probe_results = manager.probe_all() + assert probe_results[BackendName.NVML].healthy is True + + manager.refresh_devices() + devices = manager.devices() + assert len(devices) == 1 + assert devices[0].id == "0" + + metrics = manager.read_all_metrics(MetricRequest()) + assert "nvml/0" in metrics + + +class _HealthyBackend(_FakeBackend): + def __init__(self, backend_name: BackendName = BackendName.NVML) -> None: + self._backend_name = backend_name + self.closed = False + + def name(self) -> BackendName: + return self._backend_name + + def probe(self) -> ProbeResult: + return ProbeResult(backend=self._backend_name, healthy=True, reason="ok") + + def close(self) -> None: + self.closed = True + + +class _UnhealthyBackend(_FakeBackend): + def __init__(self, backend_name: BackendName = BackendName.NVML) -> None: + self._backend_name = backend_name + self.closed = False + + def name(self) -> BackendName: + return self._backend_name + + def probe(self) -> ProbeResult: + return ProbeResult(backend=self._backend_name, healthy=False, reason="missing") + + def close(self) -> None: + self.closed = True + + +def test_manager_probe_all_unhealthy_backend() -> None: + unhealthy = _UnhealthyBackend(BackendName.NVML) + manager = AcceleratorManager(factories={BackendName.NVML: lambda: unhealthy}) + results = manager.probe_all() + + assert results[BackendName.NVML].healthy is False + assert manager.get_backend(BackendName.NVML) is None + assert unhealthy.closed is True + + +def test_manager_reprobe_closes_stale_backend() -> None: + first_backend = _HealthyBackend() + second_backend = _HealthyBackend() + created = [first_backend, second_backend] + + def _factory() -> _HealthyBackend: + return created.pop(0) + + manager = AcceleratorManager(factories={BackendName.NVML: _factory}) + manager.probe_all() + assert manager.get_backend(BackendName.NVML) is first_backend + assert first_backend.closed is False + + manager.probe_all() + assert first_backend.closed is True + assert manager.get_backend(BackendName.NVML) is second_backend + + +class _BrokenEnumerateBackend(_HealthyBackend): + def enumerate_devices(self) -> list[DeviceHandle]: + raise RuntimeError("enumerate boom") + + +def test_manager_wraps_enumerate_errors() -> None: + manager = AcceleratorManager( + factories={BackendName.NVML: lambda: _BrokenEnumerateBackend()} + ) + manager.probe_all() + with pytest.raises(BackendOperationError): + manager.refresh_devices() + + +def test_probe_prefers_discovered_library(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.probe.find_library", lambda _: "libA" + ) + + loaded_paths: list[str] = [] + + def _fake_cdll(path: str) -> object: + loaded_paths.append(path) + return object() + + monkeypatch.setattr("gcm.monitoring.accelerator.probe.CDLL", _fake_cdll) + selected = find_and_load_library(["nvidia-ml"], ["/fallback/libnvidia-ml.so"]) + assert selected == "libA" + assert loaded_paths == ["libA"] + + +def test_probe_fallback_when_discovered_library_unloadable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "gcm.monitoring.accelerator.probe.find_library", lambda _: "libA" + ) + + def _fake_cdll(path: str) -> object: + if path == "libA": + raise OSError("bad lib") + return object() + + monkeypatch.setattr("gcm.monitoring.accelerator.probe.CDLL", _fake_cdll) + selected = find_and_load_library(["nvidia-ml"], ["/fallback/libnvidia-ml.so"]) + assert selected == "/fallback/libnvidia-ml.so" + + +def test_registry_includes_expected_backends() -> None: + factories = default_backend_factories() + assert BackendName.NVML in factories + assert len(factories) == 1 diff --git a/gcm/tests/test_gcm.py b/gcm/tests/test_gcm.py index 0b01cb2..c5e39ec 100644 --- a/gcm/tests/test_gcm.py +++ b/gcm/tests/test_gcm.py @@ -1,11 +1,27 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. +import json +import subprocess +import sys +from pathlib import Path + import pytest from click.testing import CliRunner from gcm.monitoring.cli.gcm import main +def _run_cli( + module: str, args: list[str], timeout: int = 30 +) -> subprocess.CompletedProcess: + return subprocess.run( + [sys.executable, "-m", module] + args, + capture_output=True, + text=True, + timeout=timeout, + ) + + @pytest.mark.parametrize("command", main.commands.keys()) def test_cli(command: str) -> None: if command == "fsacct": @@ -17,3 +33,75 @@ def test_cli(command: str) -> None: result = runner.invoke(main, [command, "--help"], catch_exceptions=False) assert result.stdout.strip() != "" + + +def test_backend_option_is_accepted() -> None: + runner = CliRunner() + result = runner.invoke( + main, + ["--backend", "nvml", "nvml_monitor", "--help"], + catch_exceptions=False, + ) + assert result.exit_code == 0 + assert "--sink" in result.stdout + + +def test_gcm_backend_nvml_full_run(tmp_path: Path) -> None: + """Full run: gcm --backend=nvml nvml_monitor --sink=stdout --once""" + proc = _run_cli( + "gcm.monitoring.cli.gcm", + [ + "--backend", + "nvml", + "nvml_monitor", + "--sink", + "stdout", + "--once", + f"--log-folder={tmp_path}", + ], + ) + # With GPU: exit 0, stdout has JSON lines (device metrics) + # Without GPU: exit 1, NVML not found + if proc.returncode != 0: + assert "NVML" in proc.stderr or "DeviceTelemetry" in proc.stderr + return + lines = [line for line in proc.stdout.strip().split("\n") if line.strip()] + if not lines: + pytest.skip("No stdout (no GPU or output not captured)") + # Stdout sink prints JSON arrays per write + parsed = json.loads(lines[0]) + assert isinstance(parsed, list) and len(parsed) >= 1 + assert "hostname" in parsed[0] or "gpu_id" in parsed[0] + + +def test_health_checks_backend_nvml_full_run(tmp_path: Path) -> None: + """Full run: health_checks --backend=nvml check-nvidia-smi ... --sink=stdout""" + proc = _run_cli( + "gcm.health_checks.cli.health_checks", + [ + "--backend", + "nvml", + "check-nvidia-smi", + "fair_cluster", + "nagios", + "--sink", + "stdout", + "-c", + "gpu_num", + "--gpu_num=0", + f"--log-folder={tmp_path}", + ], + ) + # May fail with gni_lib/ImportError in minimal env - skip in that case + if "gni_lib" in proc.stderr or "ModuleNotFoundError" in proc.stderr: + pytest.skip("health_checks requires gni_lib (full test env)") + # Success: exit 0, stdout has JSON array (may be prefixed by log line) + assert proc.returncode == 0, f"stderr: {proc.stderr}" + out = proc.stdout.strip() + # Extract JSON (may follow "WARNING - ...\n") + json_start = out.find("[") + assert json_start >= 0, f"No JSON array in output: {out[:200]}" + data = json.loads(out[json_start:]) + assert isinstance(data, list) and len(data) >= 1 + row = data[0] + assert "cluster" in row and "health_check" in row and "result" in row