diff --git a/dev-requirements.txt b/dev-requirements.txt index e82d44b..f0af383 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -183,6 +183,7 @@ keyring==25.7.0 \ --hash=sha256:fe01bd85eb3f8fb3dd0405defdeac9a5b4f6f0439edbb3149577f244a2e8245b # via twine libcst==1.8.1 \ + --hash=sha256:a748502a2ef57834e7f135a51392dc6a431f2d4547f8b2917cd8e27c91d30c61 \ --hash=sha256:423427819409a1d905017bbd51062bd0f1e4795c74c2f9f52a6b63dd67c282d2 \ --hash=sha256:bdad73ce302741354abd2d0ac54add8bbbffb123a176629f65ce16e0dff012f6 # via @@ -374,6 +375,7 @@ pynvml==11.4.1 \ --hash=sha256:d27be542cd9d06558de18e2deffc8022ccd7355bc7382255d477038e7e424c6c # via gcm (pyproject.toml) pyoxidizer==0.24.0 \ + --hash=sha256:1a9940d2bdb6c9e6c6c45eb4de3d4d6e5c9b3a3724dbc7d774d85d4956058447 \ --hash=sha256:ec56f2b99495aa0178e927389a3e151e9669beae4e2bce3f6897fb9891b5502e # via gcm (pyproject.toml) pyproject-hooks==1.2.0 \ diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 3108046..1f97c89 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -1,24 +1,44 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. +from __future__ import annotations + +import csv +import io import json +import logging import os from dataclasses import asdict -from typing import Callable, Optional, Tuple +from typing import Any, Callable, cast, Dict, Literal, Optional, Tuple, TYPE_CHECKING from gcm.exporters import register - +from gcm.monitoring.dataclass_utils import asdict_recursive from gcm.monitoring.meta_utils.scuba import to_scuba_message from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams - from gcm.monitoring.utils.monitor import init_logger from gcm.schemas.log import Log +if TYPE_CHECKING: + from _typeshed import DataclassInstance + split_path: Callable[[str], Tuple[str, str]] = lambda path: ( os.path.dirname(path), os.path.basename(path), ) +def _schema_versioned_path(path: str, schema_index: int) -> str: + if schema_index == 0: + return path + stem, ext = os.path.splitext(path) + return f"{stem}_{schema_index}{ext}" + + +def _flatten_for_csv(payload: object) -> Dict[str, Any]: + """Flatten scuba message dict for CSV output.""" + flat = asdict_recursive(to_scuba_message(cast("DataclassInstance", payload))) + return flat if isinstance(flat, dict) else {} + + @register("file") class File: """Write data to file.""" @@ -29,16 +49,29 @@ def __init__( file_path: Optional[str] = None, job_file_path: Optional[str] = None, node_file_path: Optional[str] = None, + format: Literal["json", "csv"] = "json", ): if all(path is None for path in [file_path, job_file_path, node_file_path]): raise Exception( "When using the file sink at least one file_path needs to be specified. See gcm %collector% --help" ) - self.data_identifier_to_logger_map = {} + self.format = format + self.data_identifier_to_logger_map: Dict[ + DataIdentifier, Optional[logging.Logger] + ] = {} + self._data_identifier_to_path: Dict[DataIdentifier, str] = {} + self._data_identifier_to_base_path: Dict[DataIdentifier, str] = {} + if self.format == "csv": + self._csv_fieldnames: Dict[str, Tuple[str, ...]] = {} + self._csv_schema_index: Dict[DataIdentifier, int] = {} if file_path is not None: file_directory, file_name = split_path(file_path) + self._data_identifier_to_path[DataIdentifier.GENERIC] = file_path + self._data_identifier_to_base_path[DataIdentifier.GENERIC] = file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.GENERIC] = 0 self.data_identifier_to_logger_map[DataIdentifier.GENERIC], _ = init_logger( logger_name=__name__ + file_path, log_dir=file_directory, @@ -48,6 +81,10 @@ def __init__( if job_file_path is not None: file_directory, file_name = split_path(job_file_path) + self._data_identifier_to_path[DataIdentifier.JOB] = job_file_path + self._data_identifier_to_base_path[DataIdentifier.JOB] = job_file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.JOB] = 0 self.data_identifier_to_logger_map[DataIdentifier.JOB], _ = init_logger( logger_name=__name__ + job_file_path, log_dir=file_directory, @@ -57,6 +94,10 @@ def __init__( if node_file_path is not None: file_directory, file_name = split_path(node_file_path) + self._data_identifier_to_path[DataIdentifier.NODE] = node_file_path + self._data_identifier_to_base_path[DataIdentifier.NODE] = node_file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.NODE] = 0 self.data_identifier_to_logger_map[DataIdentifier.NODE], _ = init_logger( logger_name=__name__ + node_file_path, log_dir=file_directory, @@ -70,24 +111,83 @@ def write( additional_params: SinkAdditionalParams, ) -> None: - # update file path if data_identifier is present on additional_params - if additional_params.data_identifier: - data_identifier = additional_params.data_identifier - if data_identifier not in self.data_identifier_to_logger_map: + data_identifier = additional_params.data_identifier or DataIdentifier.GENERIC + if data_identifier not in self.data_identifier_to_logger_map: + raise AssertionError( + f"data_identifier value is unsupported on file sink: {data_identifier}" + ) + if self.data_identifier_to_logger_map[data_identifier] is None: + raise AssertionError( + f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + ) + logger = self.data_identifier_to_logger_map[data_identifier] + assert logger is not None + + if self.format == "csv": + path = self._data_identifier_to_path.get(data_identifier) + if path is None: raise AssertionError( - f"data_identifier value is unsupported on file sink: {data_identifier}" + "CSV format requires data_identifier to match a configured path" ) - if self.data_identifier_to_logger_map[data_identifier] is None: - raise AssertionError( - f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + records = [_flatten_for_csv(p) for p in data.message] + if not records: + return + all_keys = sorted({k for r in records for k in r.keys()}) + fieldnames = tuple(all_keys) + previous_fieldnames = self._csv_fieldnames.get(path) + + if ( + previous_fieldnames is not None + and previous_fieldnames != fieldnames + and data_identifier in self._csv_schema_index + ): + next_schema_index = self._csv_schema_index[data_identifier] + 1 + self._csv_schema_index[data_identifier] = next_schema_index + + base_path = self._data_identifier_to_base_path[data_identifier] + path = _schema_versioned_path(base_path, next_schema_index) + self._data_identifier_to_path[data_identifier] = path + + file_directory, file_name = split_path(path) + logger, _ = init_logger( + logger_name=__name__ + path, + log_dir=file_directory, + log_name=file_name, + log_formatter=None, ) - logger = self.data_identifier_to_logger_map[data_identifier] - else: - logger = self.data_identifier_to_logger_map[DataIdentifier.GENERIC] + self.data_identifier_to_logger_map[data_identifier] = logger + previous_fieldnames = self._csv_fieldnames.get(path) + + if previous_fieldnames != fieldnames: + header_buf = io.StringIO() + header_writer = csv.DictWriter( + header_buf, + fieldnames=all_keys, + extrasaction="ignore", + lineterminator="", + ) + header_writer.writeheader() + logger.info(header_buf.getvalue()) + self._csv_fieldnames[path] = fieldnames - for payload in data.message: - # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter - logger.info(json.dumps(asdict(to_scuba_message(payload)))) + row_buf = io.StringIO() + row_writer = csv.DictWriter( + row_buf, + fieldnames=all_keys, + extrasaction="ignore", + lineterminator="", + ) + for record in records: + row_buf.seek(0) + row_buf.truncate(0) + row_writer.writerow(record) + logger.info(row_buf.getvalue()) + elif self.format == "json": + for payload in data.message: + # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter + logger.info(json.dumps(asdict(to_scuba_message(payload)))) + else: + raise ValueError(f"Unsupported format: {self.format!r}") def shutdown(self) -> None: pass diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py new file mode 100644 index 0000000..d19163a --- /dev/null +++ b/gcm/tests/test_file_exporter.py @@ -0,0 +1,72 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import csv +from dataclasses import dataclass +from pathlib import Path + +from gcm.exporters.file import File +from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams +from gcm.schemas.log import Log + + +@dataclass +class SamplePayload: + job_id: int + state: str + user: str + + +@dataclass +class OtherSamplePayload: + gpu_uuid: str + memory_used_mb: int + + +def test_file_exporter_csv(tmp_path: Path) -> None: + path = tmp_path / "data.csv" + sink = File(file_path=str(path), format="csv") + sink.write( + Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + sink.write( + Log(ts=2000, message=[SamplePayload(job_id=2, state="PENDING", user="bob")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + with open(path) as f: + rows = list(csv.DictReader(f)) + assert len(rows) == 2 + assert rows[0]["state"] == "RUNNING" + assert rows[0]["user"] == "alice" + assert rows[1]["state"] == "PENDING" + assert rows[1]["user"] == "bob" + + +def test_file_exporter_csv_rolls_over_on_schema_change(tmp_path: Path) -> None: + path = tmp_path / "data.csv" + sink = File(file_path=str(path), format="csv") + + sink.write( + Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + sink.write( + Log( + ts=2000, + message=[OtherSamplePayload(gpu_uuid="GPU-123", memory_used_mb=2048)], + ), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + + lines = path.read_text().splitlines() + assert lines == [ + "job_id,state,user", + "1,RUNNING,alice", + ] + + path_rollover = tmp_path / "data_1.csv" + rollover_lines = path_rollover.read_text().splitlines() + assert rollover_lines == [ + "gpu_uuid,memory_used_mb", + "GPU-123,2048", + ] diff --git a/website/docs/GCM_Health_Checks/exporters/file.md b/website/docs/GCM_Health_Checks/exporters/file.md index 8a8e122..38e7c87 100644 --- a/website/docs/GCM_Health_Checks/exporters/file.md +++ b/website/docs/GCM_Health_Checks/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage diff --git a/website/docs/GCM_Monitoring/exporters/file.md b/website/docs/GCM_Monitoring/exporters/file.md index 8a8e122..edd5b02 100644 --- a/website/docs/GCM_Monitoring/exporters/file.md +++ b/website/docs/GCM_Monitoring/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage @@ -36,6 +37,8 @@ sink_opts = [ ## Output Format +### JSON (default) + Data is written as newline-delimited JSON (NDJSON), with each monitoring event on a separate line: ```json @@ -48,6 +51,18 @@ This format allows for: - Line-by-line processing with standard tools - Easy parsing with JSON libraries +### CSV + +Use `format=csv` for comma-separated output suitable for spreadsheets and offline analysis: + +```shell +gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink-opt format=csv --once +``` + +The first write adds a header row; subsequent writes append data rows. If a later +payload has a different schema, the exporter creates a new file with a numeric +suffix (for example, `data_1.csv`) and writes the new header there. + ## Use Cases ### Production Monitoring