Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev-requirements.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

136 changes: 118 additions & 18 deletions gcm/exporters/file.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
from __future__ import annotations

import csv
import io
import json
import logging
import os
from dataclasses import asdict
from typing import Callable, Optional, Tuple
from typing import Any, Callable, cast, Dict, Literal, Optional, Tuple, TYPE_CHECKING

from gcm.exporters import register

from gcm.monitoring.dataclass_utils import asdict_recursive
from gcm.monitoring.meta_utils.scuba import to_scuba_message
from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams

from gcm.monitoring.utils.monitor import init_logger
from gcm.schemas.log import Log

if TYPE_CHECKING:
from _typeshed import DataclassInstance

split_path: Callable[[str], Tuple[str, str]] = lambda path: (
os.path.dirname(path),
os.path.basename(path),
)


def _schema_versioned_path(path: str, schema_index: int) -> str:
if schema_index == 0:
return path
stem, ext = os.path.splitext(path)
return f"{stem}_{schema_index}{ext}"


def _flatten_for_csv(payload: object) -> Dict[str, Any]:
"""Flatten scuba message dict for CSV output."""
flat = asdict_recursive(to_scuba_message(cast("DataclassInstance", payload)))
return flat if isinstance(flat, dict) else {}


@register("file")
class File:
"""Write data to file."""
Expand All @@ -29,16 +49,29 @@ def __init__(
file_path: Optional[str] = None,
job_file_path: Optional[str] = None,
node_file_path: Optional[str] = None,
format: Literal["json", "csv"] = "json",
):
if all(path is None for path in [file_path, job_file_path, node_file_path]):
raise Exception(
"When using the file sink at least one file_path needs to be specified. See gcm %collector% --help"
)

self.data_identifier_to_logger_map = {}
self.format = format
self.data_identifier_to_logger_map: Dict[
DataIdentifier, Optional[logging.Logger]
] = {}
self._data_identifier_to_path: Dict[DataIdentifier, str] = {}
self._data_identifier_to_base_path: Dict[DataIdentifier, str] = {}
if self.format == "csv":
self._csv_fieldnames: Dict[str, Tuple[str, ...]] = {}
self._csv_schema_index: Dict[DataIdentifier, int] = {}

if file_path is not None:
file_directory, file_name = split_path(file_path)
self._data_identifier_to_path[DataIdentifier.GENERIC] = file_path
self._data_identifier_to_base_path[DataIdentifier.GENERIC] = file_path
if self.format == "csv":
self._csv_schema_index[DataIdentifier.GENERIC] = 0
self.data_identifier_to_logger_map[DataIdentifier.GENERIC], _ = init_logger(
logger_name=__name__ + file_path,
log_dir=file_directory,
Expand All @@ -48,6 +81,10 @@ def __init__(

if job_file_path is not None:
file_directory, file_name = split_path(job_file_path)
self._data_identifier_to_path[DataIdentifier.JOB] = job_file_path
self._data_identifier_to_base_path[DataIdentifier.JOB] = job_file_path
if self.format == "csv":
self._csv_schema_index[DataIdentifier.JOB] = 0
self.data_identifier_to_logger_map[DataIdentifier.JOB], _ = init_logger(
logger_name=__name__ + job_file_path,
log_dir=file_directory,
Expand All @@ -57,6 +94,10 @@ def __init__(

if node_file_path is not None:
file_directory, file_name = split_path(node_file_path)
self._data_identifier_to_path[DataIdentifier.NODE] = node_file_path
self._data_identifier_to_base_path[DataIdentifier.NODE] = node_file_path
if self.format == "csv":
self._csv_schema_index[DataIdentifier.NODE] = 0
self.data_identifier_to_logger_map[DataIdentifier.NODE], _ = init_logger(
logger_name=__name__ + node_file_path,
log_dir=file_directory,
Expand All @@ -70,24 +111,83 @@ def write(
additional_params: SinkAdditionalParams,
) -> None:

# update file path if data_identifier is present on additional_params
if additional_params.data_identifier:
data_identifier = additional_params.data_identifier
if data_identifier not in self.data_identifier_to_logger_map:
data_identifier = additional_params.data_identifier or DataIdentifier.GENERIC
if data_identifier not in self.data_identifier_to_logger_map:
raise AssertionError(
f"data_identifier value is unsupported on file sink: {data_identifier}"
)
if self.data_identifier_to_logger_map[data_identifier] is None:
raise AssertionError(
f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help"
)
logger = self.data_identifier_to_logger_map[data_identifier]
assert logger is not None

if self.format == "csv":
path = self._data_identifier_to_path.get(data_identifier)
if path is None:
raise AssertionError(
f"data_identifier value is unsupported on file sink: {data_identifier}"
"CSV format requires data_identifier to match a configured path"
)
if self.data_identifier_to_logger_map[data_identifier] is None:
raise AssertionError(
f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help"
records = [_flatten_for_csv(p) for p in data.message]
if not records:
return
all_keys = sorted({k for r in records for k in r.keys()})
fieldnames = tuple(all_keys)
previous_fieldnames = self._csv_fieldnames.get(path)

if (
previous_fieldnames is not None
and previous_fieldnames != fieldnames
and data_identifier in self._csv_schema_index
):
next_schema_index = self._csv_schema_index[data_identifier] + 1
self._csv_schema_index[data_identifier] = next_schema_index

base_path = self._data_identifier_to_base_path[data_identifier]
path = _schema_versioned_path(base_path, next_schema_index)
self._data_identifier_to_path[data_identifier] = path

file_directory, file_name = split_path(path)
logger, _ = init_logger(
logger_name=__name__ + path,
log_dir=file_directory,
log_name=file_name,
log_formatter=None,
)
logger = self.data_identifier_to_logger_map[data_identifier]
else:
logger = self.data_identifier_to_logger_map[DataIdentifier.GENERIC]
self.data_identifier_to_logger_map[data_identifier] = logger
previous_fieldnames = self._csv_fieldnames.get(path)

if previous_fieldnames != fieldnames:
header_buf = io.StringIO()
header_writer = csv.DictWriter(
header_buf,
fieldnames=all_keys,
extrasaction="ignore",
lineterminator="",
)
header_writer.writeheader()
logger.info(header_buf.getvalue())
self._csv_fieldnames[path] = fieldnames

for payload in data.message:
# TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter
logger.info(json.dumps(asdict(to_scuba_message(payload))))
row_buf = io.StringIO()
row_writer = csv.DictWriter(
row_buf,
fieldnames=all_keys,
extrasaction="ignore",
lineterminator="",
)
for record in records:
row_buf.seek(0)
row_buf.truncate(0)
row_writer.writerow(record)
logger.info(row_buf.getvalue())
elif self.format == "json":
for payload in data.message:
# TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter
logger.info(json.dumps(asdict(to_scuba_message(payload))))
else:
raise ValueError(f"Unsupported format: {self.format!r}")

def shutdown(self) -> None:
pass
72 changes: 72 additions & 0 deletions gcm/tests/test_file_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
import csv
from dataclasses import dataclass
from pathlib import Path

from gcm.exporters.file import File
from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams
from gcm.schemas.log import Log


@dataclass
class SamplePayload:
job_id: int
state: str
user: str


@dataclass
class OtherSamplePayload:
gpu_uuid: str
memory_used_mb: int


def test_file_exporter_csv(tmp_path: Path) -> None:
path = tmp_path / "data.csv"
sink = File(file_path=str(path), format="csv")
sink.write(
Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]),
SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC),
)
sink.write(
Log(ts=2000, message=[SamplePayload(job_id=2, state="PENDING", user="bob")]),
SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC),
)
with open(path) as f:
rows = list(csv.DictReader(f))
assert len(rows) == 2
assert rows[0]["state"] == "RUNNING"
assert rows[0]["user"] == "alice"
assert rows[1]["state"] == "PENDING"
assert rows[1]["user"] == "bob"


def test_file_exporter_csv_rolls_over_on_schema_change(tmp_path: Path) -> None:
path = tmp_path / "data.csv"
sink = File(file_path=str(path), format="csv")

sink.write(
Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]),
SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC),
)
sink.write(
Log(
ts=2000,
message=[OtherSamplePayload(gpu_uuid="GPU-123", memory_used_mb=2048)],
),
SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC),
)

lines = path.read_text().splitlines()
assert lines == [
"job_id,state,user",
"1,RUNNING,alice",
]

path_rollover = tmp_path / "data_1.csv"
rollover_lines = path_rollover.read_text().splitlines()
assert rollover_lines == [
"gpu_uuid,memory_used_mb",
"GPU-123,2048",
]
1 change: 1 addition & 0 deletions website/docs/GCM_Health_Checks/exporters/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file
| Option | Required | Description |
|--------|----------|-------------|
| `file_path` | Yes | Path to the output file |
| `format` | No | `json` (default) or `csv` |

### Basic Usage

Expand Down
15 changes: 15 additions & 0 deletions website/docs/GCM_Monitoring/exporters/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file
| Option | Required | Description |
|--------|----------|-------------|
| `file_path` | Yes | Path to the output file |
| `format` | No | `json` (default) or `csv` |

### Basic Usage

Expand All @@ -36,6 +37,8 @@ sink_opts = [

## Output Format

### JSON (default)

Data is written as newline-delimited JSON (NDJSON), with each monitoring event on a separate line:

```json
Expand All @@ -48,6 +51,18 @@ This format allows for:
- Line-by-line processing with standard tools
- Easy parsing with JSON libraries

### CSV

Use `format=csv` for comma-separated output suitable for spreadsheets and offline analysis:

```shell
gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink-opt format=csv --once
```

The first write adds a header row; subsequent writes append data rows. If a later
payload has a different schema, the exporter creates a new file with a numeric
suffix (for example, `data_1.csv`) and writes the new header there.

## Use Cases

### Production Monitoring
Expand Down
Loading