From a6e5845c4c448da9c42e0a4348c996e3c78d2387 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 4 Mar 2026 14:36:23 -0800 Subject: [PATCH 01/15] feat: add telemetry exporter for structured JSON/CSV export (#87) Add a new 'telemetry' sink that periodically appends telemetry snapshots to a local file in JSON or CSV format for offline analysis. - JSON: NDJSON format (one object per line) - CSV: Header on first write, append rows - Options: file_path (required), format (json|csv, default json) - Works with nvml_monitor, slurm_monitor, and health checks Closes #87 Made-with: Cursor --- gcm/exporters/telemetry.py | 59 ++++++++++++++ gcm/tests/test_telemetry_exporter.py | 80 +++++++++++++++++++ .../GCM_Health_Checks/exporters/README.md | 1 + .../GCM_Health_Checks/exporters/telemetry.md | 7 ++ .../docs/GCM_Monitoring/exporters/README.md | 1 + .../GCM_Monitoring/exporters/telemetry.md | 32 ++++++++ 6 files changed, 180 insertions(+) create mode 100644 gcm/exporters/telemetry.py create mode 100644 gcm/tests/test_telemetry_exporter.py create mode 100644 website/docs/GCM_Health_Checks/exporters/telemetry.md create mode 100644 website/docs/GCM_Monitoring/exporters/telemetry.md diff --git a/gcm/exporters/telemetry.py b/gcm/exporters/telemetry.py new file mode 100644 index 0000000..03d68d2 --- /dev/null +++ b/gcm/exporters/telemetry.py @@ -0,0 +1,59 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +"""Structured telemetry export (JSON/CSV) for offline analysis.""" + +import csv +import json +import os +from dataclasses import asdict +from datetime import datetime +from typing import Literal + +from gcm.exporters import register +from gcm.monitoring.dataclass_utils import remove_none_dict_factory +from gcm.monitoring.sink.protocol import SinkAdditionalParams +from gcm.schemas.log import Log + + +def _snapshot(ts: int, msg: object) -> dict: + d = asdict(msg, dict_factory=remove_none_dict_factory) + d["timestamp"] = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S") + return d + + +@register("telemetry") +class Telemetry: + """Append telemetry snapshots to a file in JSON or CSV format.""" + + def __init__( + self, + *, + file_path: str, + format: Literal["json", "csv"] = "json", + ) -> None: + self.file_path = file_path + self.format = format + self._header_written = False + + def write( + self, + data: Log, + additional_params: SinkAdditionalParams, + ) -> None: + records = [_snapshot(data.ts, m) for m in data.message] + if not records: + return + os.makedirs(os.path.dirname(self.file_path) or ".", exist_ok=True) + with open(self.file_path, "a") as f: + if self.format == "json": + for r in records: + f.write(json.dumps(r) + "\n") + else: + all_keys = ["timestamp"] + sorted( + {k for r in records for k in r.keys()} - {"timestamp"} + ) + w = csv.DictWriter(f, fieldnames=all_keys, extrasaction="ignore") + if not self._header_written: + w.writeheader() + self._header_written = True + w.writerows(records) diff --git a/gcm/tests/test_telemetry_exporter.py b/gcm/tests/test_telemetry_exporter.py new file mode 100644 index 0000000..d74d778 --- /dev/null +++ b/gcm/tests/test_telemetry_exporter.py @@ -0,0 +1,80 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import csv +import json + +from gcm.exporters.telemetry import Telemetry +from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams +from gcm.schemas.device_metrics import DevicePlusJobMetrics +from gcm.schemas.log import Log + + +def test_telemetry_json(tmp_path) -> None: + path = tmp_path / "telemetry.json" + sink = Telemetry(file_path=str(path), format="json") + msg = DevicePlusJobMetrics( + gpu_id=3, + hostname="node-42", + job_id=91283, + job_user="research_team", + gpu_util=88, + mem_used_percent=90, + temperature=78, + power_draw=310, + retired_pages_count_single_bit=0, + retired_pages_count_double_bit=0, + ) + sink.write( + Log(ts=1741114282, message=[msg]), + SinkAdditionalParams(data_type=DataType.LOG), + ) + lines = path.read_text().strip().split("\n") + assert len(lines) == 1 + data = json.loads(lines[0]) + assert data["timestamp"] == "2025-03-04T21:31:22" + assert data["hostname"] == "node-42" + assert data["gpu_id"] == 3 + assert data["job_id"] == 91283 + assert data["job_user"] == "research_team" + assert data["gpu_util"] == 88 + assert data["temperature"] == 78 + assert data["power_draw"] == 310 + + +def test_telemetry_csv(tmp_path) -> None: + path = tmp_path / "telemetry.csv" + sink = Telemetry(file_path=str(path), format="csv") + msg = DevicePlusJobMetrics( + gpu_id=0, + hostname="node-1", + job_id=100, + job_user="user", + gpu_util=50, + temperature=65, + power_draw=200, + ) + sink.write( + Log(ts=1741114282, message=[msg]), + SinkAdditionalParams(data_type=DataType.LOG), + ) + with open(path) as f: + reader = csv.DictReader(f) + rows = list(reader) + assert len(rows) == 1 + assert rows[0]["timestamp"] == "2025-03-04T21:31:22" + assert rows[0]["hostname"] == "node-1" + assert rows[0]["gpu_id"] == "0" + assert rows[0]["gpu_util"] == "50" + + +def test_telemetry_csv_append(tmp_path) -> None: + path = tmp_path / "telemetry.csv" + sink = Telemetry(file_path=str(path), format="csv") + msg = DevicePlusJobMetrics(gpu_id=0, hostname="n1", gpu_util=10) + sink.write(Log(ts=1000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG)) + sink.write(Log(ts=2000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG)) + with open(path) as f: + rows = list(csv.DictReader(f)) + assert len(rows) == 2 + assert rows[0]["timestamp"] == "1970-01-01T00:16:40" + assert rows[1]["timestamp"] == "1970-01-01T00:33:20" diff --git a/website/docs/GCM_Health_Checks/exporters/README.md b/website/docs/GCM_Health_Checks/exporters/README.md index cecd00d..3767d25 100644 --- a/website/docs/GCM_Health_Checks/exporters/README.md +++ b/website/docs/GCM_Health_Checks/exporters/README.md @@ -25,6 +25,7 @@ GCM includes several built-in exporters for different use cases: | [Graph API](graph_api.md) | `graph_api` | Meta's internal backends | | [OpenTelemetry](otel.md) | `otel` | OTLP-compatible backends | | [Stdout](stdout.md) | `stdout` | Terminal output | +| [Telemetry](telemetry.md) | `telemetry` | Structured JSON/CSV for offline analysis | | [Webhook](webhook.md) | `webhook` | HTTP endpoint forwarding | ## Plugin System diff --git a/website/docs/GCM_Health_Checks/exporters/telemetry.md b/website/docs/GCM_Health_Checks/exporters/telemetry.md new file mode 100644 index 0000000..c4fcaae --- /dev/null +++ b/website/docs/GCM_Health_Checks/exporters/telemetry.md @@ -0,0 +1,7 @@ +--- +sidebar_position: 7 +--- + +# Telemetry + +The Telemetry exporter appends structured telemetry snapshots to a local file in JSON or CSV format for offline analysis. See [GCM Monitoring Telemetry exporter](../../GCM_Monitoring/exporters/telemetry.md) for full documentation. diff --git a/website/docs/GCM_Monitoring/exporters/README.md b/website/docs/GCM_Monitoring/exporters/README.md index 05c8e07..8027e68 100644 --- a/website/docs/GCM_Monitoring/exporters/README.md +++ b/website/docs/GCM_Monitoring/exporters/README.md @@ -25,6 +25,7 @@ GCM includes several built-in exporters for different use cases: | [Graph API](graph_api.md) | `graph_api` | Meta's internal backends | | [OpenTelemetry](otel.md) | `otel` | OTLP-compatible backends | | [Stdout](stdout.md) | `stdout` | Terminal output | +| [Telemetry](telemetry.md) | `telemetry` | Structured JSON/CSV for offline analysis | | [Webhook](webhook.md) | `webhook` | HTTP endpoint forwarding | ## Plugin System diff --git a/website/docs/GCM_Monitoring/exporters/telemetry.md b/website/docs/GCM_Monitoring/exporters/telemetry.md new file mode 100644 index 0000000..6cb199a --- /dev/null +++ b/website/docs/GCM_Monitoring/exporters/telemetry.md @@ -0,0 +1,32 @@ +--- +sidebar_position: 7 +--- + +# Telemetry + +The Telemetry exporter appends structured telemetry snapshots to a local file in JSON or CSV format for offline analysis. + +## Configuration + +| Option | Required | Description | +|--------|----------|-------------| +| `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | + +## Usage + +```shell +# JSON (NDJSON, one object per line) +gcm nvml_monitor --sink=telemetry --sink-opt file_path=/var/log/gcm/telemetry.json --once + +# CSV +gcm nvml_monitor --sink=telemetry --sink-opt file_path=/var/log/gcm/telemetry.csv --sink-opt format=csv --once +``` + +## Output + +Each snapshot adds a timestamp and writes one record per GPU. Example JSON: + +```json +{"timestamp": "2026-03-04T21:31:22", "hostname": "node-42", "gpu_id": 3, "job_id": 91283, "job_user": "research_team", "gpu_util": 88, "mem_used_percent": 71, "temperature": 78, "power_draw": 310, "retired_pages_count_single_bit": 0, "retired_pages_count_double_bit": 0} +``` From d16392fed755cb1990682d2d5b75048645f3dc3f Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 4 Mar 2026 14:38:25 -0800 Subject: [PATCH 02/15] fix: correct expected UTC timestamp in telemetry exporter tests Made-with: Cursor --- gcm/tests/test_telemetry_exporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcm/tests/test_telemetry_exporter.py b/gcm/tests/test_telemetry_exporter.py index d74d778..2f0d090 100644 --- a/gcm/tests/test_telemetry_exporter.py +++ b/gcm/tests/test_telemetry_exporter.py @@ -31,7 +31,7 @@ def test_telemetry_json(tmp_path) -> None: lines = path.read_text().strip().split("\n") assert len(lines) == 1 data = json.loads(lines[0]) - assert data["timestamp"] == "2025-03-04T21:31:22" + assert data["timestamp"] == "2025-03-04T18:51:22" # UTC for ts=1741114282 assert data["hostname"] == "node-42" assert data["gpu_id"] == 3 assert data["job_id"] == 91283 @@ -61,7 +61,7 @@ def test_telemetry_csv(tmp_path) -> None: reader = csv.DictReader(f) rows = list(reader) assert len(rows) == 1 - assert rows[0]["timestamp"] == "2025-03-04T21:31:22" + assert rows[0]["timestamp"] == "2025-03-04T18:51:22" # UTC for ts=1741114282 assert rows[0]["hostname"] == "node-1" assert rows[0]["gpu_id"] == "0" assert rows[0]["gpu_util"] == "50" From e3f37d8b1c0ab468493b822e2ceabdc1a6748eb4 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 4 Mar 2026 14:48:12 -0800 Subject: [PATCH 03/15] fix: satisfy nox format and typecheck --- gcm/exporters/telemetry.py | 12 ++++++++++-- gcm/tests/test_telemetry_exporter.py | 15 ++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/gcm/exporters/telemetry.py b/gcm/exporters/telemetry.py index 03d68d2..6b10b6b 100644 --- a/gcm/exporters/telemetry.py +++ b/gcm/exporters/telemetry.py @@ -2,21 +2,29 @@ # All rights reserved. """Structured telemetry export (JSON/CSV) for offline analysis.""" +from __future__ import annotations + import csv import json import os from dataclasses import asdict from datetime import datetime -from typing import Literal +from typing import cast, Literal, TYPE_CHECKING from gcm.exporters import register + +if TYPE_CHECKING: + from _typeshed import DataclassInstance from gcm.monitoring.dataclass_utils import remove_none_dict_factory from gcm.monitoring.sink.protocol import SinkAdditionalParams from gcm.schemas.log import Log def _snapshot(ts: int, msg: object) -> dict: - d = asdict(msg, dict_factory=remove_none_dict_factory) + d = asdict( + cast("DataclassInstance", msg), + dict_factory=remove_none_dict_factory, + ) d["timestamp"] = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S") return d diff --git a/gcm/tests/test_telemetry_exporter.py b/gcm/tests/test_telemetry_exporter.py index 2f0d090..f7219c0 100644 --- a/gcm/tests/test_telemetry_exporter.py +++ b/gcm/tests/test_telemetry_exporter.py @@ -2,6 +2,7 @@ # All rights reserved. import csv import json +from pathlib import Path from gcm.exporters.telemetry import Telemetry from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams @@ -9,7 +10,7 @@ from gcm.schemas.log import Log -def test_telemetry_json(tmp_path) -> None: +def test_telemetry_json(tmp_path: Path) -> None: path = tmp_path / "telemetry.json" sink = Telemetry(file_path=str(path), format="json") msg = DevicePlusJobMetrics( @@ -41,7 +42,7 @@ def test_telemetry_json(tmp_path) -> None: assert data["power_draw"] == 310 -def test_telemetry_csv(tmp_path) -> None: +def test_telemetry_csv(tmp_path: Path) -> None: path = tmp_path / "telemetry.csv" sink = Telemetry(file_path=str(path), format="csv") msg = DevicePlusJobMetrics( @@ -67,12 +68,16 @@ def test_telemetry_csv(tmp_path) -> None: assert rows[0]["gpu_util"] == "50" -def test_telemetry_csv_append(tmp_path) -> None: +def test_telemetry_csv_append(tmp_path: Path) -> None: path = tmp_path / "telemetry.csv" sink = Telemetry(file_path=str(path), format="csv") msg = DevicePlusJobMetrics(gpu_id=0, hostname="n1", gpu_util=10) - sink.write(Log(ts=1000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG)) - sink.write(Log(ts=2000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG)) + sink.write( + Log(ts=1000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG) + ) + sink.write( + Log(ts=2000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG) + ) with open(path) as f: rows = list(csv.DictReader(f)) assert len(rows) == 2 From 4f00cf72c965e6adc7aed4a0d7b8ec22cc46f02f Mon Sep 17 00:00:00 2001 From: Achintya P Date: Sat, 7 Mar 2026 23:26:05 -0800 Subject: [PATCH 04/15] changed to supporting csv --- gcm/exporters/file.py | 76 ++++++++++++++----- gcm/tests/test_file_exporter.py | 36 +++++++++ .../docs/GCM_Health_Checks/exporters/file.md | 1 + website/docs/GCM_Monitoring/exporters/file.md | 13 ++++ 4 files changed, 107 insertions(+), 19 deletions(-) create mode 100644 gcm/tests/test_file_exporter.py diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 3108046..cf1d3be 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -1,24 +1,37 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. +from __future__ import annotations + +import csv import json +import logging import os from dataclasses import asdict -from typing import Callable, Optional, Tuple +from typing import Any, Callable, cast, Dict, Literal, Optional, Tuple, TYPE_CHECKING from gcm.exporters import register - +from gcm.monitoring.dataclass_utils import asdict_recursive from gcm.monitoring.meta_utils.scuba import to_scuba_message from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams - from gcm.monitoring.utils.monitor import init_logger from gcm.schemas.log import Log +if TYPE_CHECKING: + from _typeshed import DataclassInstance + split_path: Callable[[str], Tuple[str, str]] = lambda path: ( os.path.dirname(path), os.path.basename(path), ) +def _flatten_for_csv(payload: object) -> Dict[str, Any]: + """Flatten scuba message dict for CSV output.""" + scuba = asdict(to_scuba_message(cast("DataclassInstance", payload))) + flat = asdict_recursive(scuba) + return flat if isinstance(flat, dict) else {} + + @register("file") class File: """Write data to file.""" @@ -29,16 +42,23 @@ def __init__( file_path: Optional[str] = None, job_file_path: Optional[str] = None, node_file_path: Optional[str] = None, + format: Literal["json", "csv"] = "json", ): if all(path is None for path in [file_path, job_file_path, node_file_path]): raise Exception( "When using the file sink at least one file_path needs to be specified. See gcm %collector% --help" ) - self.data_identifier_to_logger_map = {} + self.format = format + self._csv_header_written: Dict[str, bool] = {} + self.data_identifier_to_logger_map: Dict[ + DataIdentifier, Optional[logging.Logger] + ] = {} + self._data_identifier_to_path: Dict[DataIdentifier, str] = {} if file_path is not None: file_directory, file_name = split_path(file_path) + self._data_identifier_to_path[DataIdentifier.GENERIC] = file_path self.data_identifier_to_logger_map[DataIdentifier.GENERIC], _ = init_logger( logger_name=__name__ + file_path, log_dir=file_directory, @@ -48,6 +68,7 @@ def __init__( if job_file_path is not None: file_directory, file_name = split_path(job_file_path) + self._data_identifier_to_path[DataIdentifier.JOB] = job_file_path self.data_identifier_to_logger_map[DataIdentifier.JOB], _ = init_logger( logger_name=__name__ + job_file_path, log_dir=file_directory, @@ -57,6 +78,7 @@ def __init__( if node_file_path is not None: file_directory, file_name = split_path(node_file_path) + self._data_identifier_to_path[DataIdentifier.NODE] = node_file_path self.data_identifier_to_logger_map[DataIdentifier.NODE], _ = init_logger( logger_name=__name__ + node_file_path, log_dir=file_directory, @@ -70,24 +92,40 @@ def write( additional_params: SinkAdditionalParams, ) -> None: - # update file path if data_identifier is present on additional_params - if additional_params.data_identifier: - data_identifier = additional_params.data_identifier - if data_identifier not in self.data_identifier_to_logger_map: - raise AssertionError( - f"data_identifier value is unsupported on file sink: {data_identifier}" - ) - if self.data_identifier_to_logger_map[data_identifier] is None: + data_identifier = additional_params.data_identifier or DataIdentifier.GENERIC + if data_identifier not in self.data_identifier_to_logger_map: + raise AssertionError( + f"data_identifier value is unsupported on file sink: {data_identifier}" + ) + if self.data_identifier_to_logger_map[data_identifier] is None: + raise AssertionError( + f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + ) + logger = self.data_identifier_to_logger_map[data_identifier] + assert logger is not None + + if self.format == "csv": + path = self._data_identifier_to_path.get(data_identifier) + if path is None: raise AssertionError( - f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + "CSV format requires data_identifier to match a configured path" ) - logger = self.data_identifier_to_logger_map[data_identifier] + records = [_flatten_for_csv(p) for p in data.message] + if not records: + return + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + all_keys = sorted({k for r in records for k in r.keys()}) + header_done = self._csv_header_written.get(path, False) + with open(path, "a") as f: + w = csv.DictWriter(f, fieldnames=all_keys, extrasaction="ignore") + if not header_done: + w.writeheader() + self._csv_header_written[path] = True + w.writerows(records) else: - logger = self.data_identifier_to_logger_map[DataIdentifier.GENERIC] - - for payload in data.message: - # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter - logger.info(json.dumps(asdict(to_scuba_message(payload)))) + for payload in data.message: + # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter + logger.info(json.dumps(asdict(to_scuba_message(payload)))) def shutdown(self) -> None: pass diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py new file mode 100644 index 0000000..bac6668 --- /dev/null +++ b/gcm/tests/test_file_exporter.py @@ -0,0 +1,36 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import csv +from dataclasses import dataclass +from pathlib import Path + +from gcm.exporters.file import File +from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams +from gcm.schemas.log import Log + + +@dataclass +class SamplePayload: + job_id: int + state: str + user: str + + +def test_file_exporter_csv(tmp_path: Path) -> None: + path = tmp_path / "data.csv" + sink = File(file_path=str(path), format="csv") + sink.write( + Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + sink.write( + Log(ts=2000, message=[SamplePayload(job_id=2, state="PENDING", user="bob")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + with open(path) as f: + rows = list(csv.DictReader(f)) + assert len(rows) == 2 + assert rows[0]["normal.state"] == "RUNNING" + assert rows[0]["normal.user"] == "alice" + assert rows[1]["normal.state"] == "PENDING" + assert rows[1]["normal.user"] == "bob" diff --git a/website/docs/GCM_Health_Checks/exporters/file.md b/website/docs/GCM_Health_Checks/exporters/file.md index 8a8e122..38e7c87 100644 --- a/website/docs/GCM_Health_Checks/exporters/file.md +++ b/website/docs/GCM_Health_Checks/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage diff --git a/website/docs/GCM_Monitoring/exporters/file.md b/website/docs/GCM_Monitoring/exporters/file.md index 8a8e122..99f7ae7 100644 --- a/website/docs/GCM_Monitoring/exporters/file.md +++ b/website/docs/GCM_Monitoring/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage @@ -36,6 +37,8 @@ sink_opts = [ ## Output Format +### JSON (default) + Data is written as newline-delimited JSON (NDJSON), with each monitoring event on a separate line: ```json @@ -48,6 +51,16 @@ This format allows for: - Line-by-line processing with standard tools - Easy parsing with JSON libraries +### CSV + +Use `format=csv` for comma-separated output suitable for spreadsheets and offline analysis: + +```shell +gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink-opt format=csv --once +``` + +The first write adds a header row; subsequent writes append data rows. + ## Use Cases ### Production Monitoring From 3f82227b3fb2091c89c74171ccff308097bffa1d Mon Sep 17 00:00:00 2001 From: Achintya P Date: Sat, 7 Mar 2026 23:32:20 -0800 Subject: [PATCH 05/15] added the shutdown for the nox checking --- gcm/exporters/telemetry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gcm/exporters/telemetry.py b/gcm/exporters/telemetry.py index 6b10b6b..15cbcdb 100644 --- a/gcm/exporters/telemetry.py +++ b/gcm/exporters/telemetry.py @@ -65,3 +65,6 @@ def write( w.writeheader() self._header_written = True w.writerows(records) + + def shutdown(self) -> None: + pass From 1b51ad74483c84d7708398bcebe9a1d9c40b6c09 Mon Sep 17 00:00:00 2001 From: theap06 Date: Sun, 8 Mar 2026 16:59:44 -0700 Subject: [PATCH 06/15] Update gcm/exporters/file.py Co-authored-by: lucca bertoncini <32229669+luccabb@users.noreply.github.com> --- gcm/exporters/file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index cf1d3be..77fd7c0 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -122,7 +122,7 @@ def write( w.writeheader() self._csv_header_written[path] = True w.writerows(records) - else: + elif format == 'json': for payload in data.message: # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter logger.info(json.dumps(asdict(to_scuba_message(payload)))) From 184a18e938c59a8369749be3e645f62fedfacc00 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Sun, 8 Mar 2026 17:03:21 -0700 Subject: [PATCH 07/15] fixed the telemetry file and added the final else --- gcm/exporters/file.py | 4 +- gcm/exporters/telemetry.py | 70 --------------- gcm/tests/test_telemetry_exporter.py | 85 ------------------- .../GCM_Health_Checks/exporters/README.md | 1 - .../GCM_Health_Checks/exporters/telemetry.md | 7 -- .../docs/GCM_Monitoring/exporters/README.md | 1 - .../GCM_Monitoring/exporters/telemetry.md | 32 ------- 7 files changed, 3 insertions(+), 197 deletions(-) delete mode 100644 gcm/exporters/telemetry.py delete mode 100644 gcm/tests/test_telemetry_exporter.py delete mode 100644 website/docs/GCM_Health_Checks/exporters/telemetry.md delete mode 100644 website/docs/GCM_Monitoring/exporters/telemetry.md diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 77fd7c0..d3a2448 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -122,10 +122,12 @@ def write( w.writeheader() self._csv_header_written[path] = True w.writerows(records) - elif format == 'json': + elif self.format == "json": for payload in data.message: # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter logger.info(json.dumps(asdict(to_scuba_message(payload)))) + else: + raise ValueError(f"Unsupported format: {self.format!r}") def shutdown(self) -> None: pass diff --git a/gcm/exporters/telemetry.py b/gcm/exporters/telemetry.py deleted file mode 100644 index 15cbcdb..0000000 --- a/gcm/exporters/telemetry.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -"""Structured telemetry export (JSON/CSV) for offline analysis.""" - -from __future__ import annotations - -import csv -import json -import os -from dataclasses import asdict -from datetime import datetime -from typing import cast, Literal, TYPE_CHECKING - -from gcm.exporters import register - -if TYPE_CHECKING: - from _typeshed import DataclassInstance -from gcm.monitoring.dataclass_utils import remove_none_dict_factory -from gcm.monitoring.sink.protocol import SinkAdditionalParams -from gcm.schemas.log import Log - - -def _snapshot(ts: int, msg: object) -> dict: - d = asdict( - cast("DataclassInstance", msg), - dict_factory=remove_none_dict_factory, - ) - d["timestamp"] = datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S") - return d - - -@register("telemetry") -class Telemetry: - """Append telemetry snapshots to a file in JSON or CSV format.""" - - def __init__( - self, - *, - file_path: str, - format: Literal["json", "csv"] = "json", - ) -> None: - self.file_path = file_path - self.format = format - self._header_written = False - - def write( - self, - data: Log, - additional_params: SinkAdditionalParams, - ) -> None: - records = [_snapshot(data.ts, m) for m in data.message] - if not records: - return - os.makedirs(os.path.dirname(self.file_path) or ".", exist_ok=True) - with open(self.file_path, "a") as f: - if self.format == "json": - for r in records: - f.write(json.dumps(r) + "\n") - else: - all_keys = ["timestamp"] + sorted( - {k for r in records for k in r.keys()} - {"timestamp"} - ) - w = csv.DictWriter(f, fieldnames=all_keys, extrasaction="ignore") - if not self._header_written: - w.writeheader() - self._header_written = True - w.writerows(records) - - def shutdown(self) -> None: - pass diff --git a/gcm/tests/test_telemetry_exporter.py b/gcm/tests/test_telemetry_exporter.py deleted file mode 100644 index f7219c0..0000000 --- a/gcm/tests/test_telemetry_exporter.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -import csv -import json -from pathlib import Path - -from gcm.exporters.telemetry import Telemetry -from gcm.monitoring.sink.protocol import DataType, SinkAdditionalParams -from gcm.schemas.device_metrics import DevicePlusJobMetrics -from gcm.schemas.log import Log - - -def test_telemetry_json(tmp_path: Path) -> None: - path = tmp_path / "telemetry.json" - sink = Telemetry(file_path=str(path), format="json") - msg = DevicePlusJobMetrics( - gpu_id=3, - hostname="node-42", - job_id=91283, - job_user="research_team", - gpu_util=88, - mem_used_percent=90, - temperature=78, - power_draw=310, - retired_pages_count_single_bit=0, - retired_pages_count_double_bit=0, - ) - sink.write( - Log(ts=1741114282, message=[msg]), - SinkAdditionalParams(data_type=DataType.LOG), - ) - lines = path.read_text().strip().split("\n") - assert len(lines) == 1 - data = json.loads(lines[0]) - assert data["timestamp"] == "2025-03-04T18:51:22" # UTC for ts=1741114282 - assert data["hostname"] == "node-42" - assert data["gpu_id"] == 3 - assert data["job_id"] == 91283 - assert data["job_user"] == "research_team" - assert data["gpu_util"] == 88 - assert data["temperature"] == 78 - assert data["power_draw"] == 310 - - -def test_telemetry_csv(tmp_path: Path) -> None: - path = tmp_path / "telemetry.csv" - sink = Telemetry(file_path=str(path), format="csv") - msg = DevicePlusJobMetrics( - gpu_id=0, - hostname="node-1", - job_id=100, - job_user="user", - gpu_util=50, - temperature=65, - power_draw=200, - ) - sink.write( - Log(ts=1741114282, message=[msg]), - SinkAdditionalParams(data_type=DataType.LOG), - ) - with open(path) as f: - reader = csv.DictReader(f) - rows = list(reader) - assert len(rows) == 1 - assert rows[0]["timestamp"] == "2025-03-04T18:51:22" # UTC for ts=1741114282 - assert rows[0]["hostname"] == "node-1" - assert rows[0]["gpu_id"] == "0" - assert rows[0]["gpu_util"] == "50" - - -def test_telemetry_csv_append(tmp_path: Path) -> None: - path = tmp_path / "telemetry.csv" - sink = Telemetry(file_path=str(path), format="csv") - msg = DevicePlusJobMetrics(gpu_id=0, hostname="n1", gpu_util=10) - sink.write( - Log(ts=1000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG) - ) - sink.write( - Log(ts=2000, message=[msg]), SinkAdditionalParams(data_type=DataType.LOG) - ) - with open(path) as f: - rows = list(csv.DictReader(f)) - assert len(rows) == 2 - assert rows[0]["timestamp"] == "1970-01-01T00:16:40" - assert rows[1]["timestamp"] == "1970-01-01T00:33:20" diff --git a/website/docs/GCM_Health_Checks/exporters/README.md b/website/docs/GCM_Health_Checks/exporters/README.md index 3767d25..cecd00d 100644 --- a/website/docs/GCM_Health_Checks/exporters/README.md +++ b/website/docs/GCM_Health_Checks/exporters/README.md @@ -25,7 +25,6 @@ GCM includes several built-in exporters for different use cases: | [Graph API](graph_api.md) | `graph_api` | Meta's internal backends | | [OpenTelemetry](otel.md) | `otel` | OTLP-compatible backends | | [Stdout](stdout.md) | `stdout` | Terminal output | -| [Telemetry](telemetry.md) | `telemetry` | Structured JSON/CSV for offline analysis | | [Webhook](webhook.md) | `webhook` | HTTP endpoint forwarding | ## Plugin System diff --git a/website/docs/GCM_Health_Checks/exporters/telemetry.md b/website/docs/GCM_Health_Checks/exporters/telemetry.md deleted file mode 100644 index c4fcaae..0000000 --- a/website/docs/GCM_Health_Checks/exporters/telemetry.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -sidebar_position: 7 ---- - -# Telemetry - -The Telemetry exporter appends structured telemetry snapshots to a local file in JSON or CSV format for offline analysis. See [GCM Monitoring Telemetry exporter](../../GCM_Monitoring/exporters/telemetry.md) for full documentation. diff --git a/website/docs/GCM_Monitoring/exporters/README.md b/website/docs/GCM_Monitoring/exporters/README.md index 8027e68..05c8e07 100644 --- a/website/docs/GCM_Monitoring/exporters/README.md +++ b/website/docs/GCM_Monitoring/exporters/README.md @@ -25,7 +25,6 @@ GCM includes several built-in exporters for different use cases: | [Graph API](graph_api.md) | `graph_api` | Meta's internal backends | | [OpenTelemetry](otel.md) | `otel` | OTLP-compatible backends | | [Stdout](stdout.md) | `stdout` | Terminal output | -| [Telemetry](telemetry.md) | `telemetry` | Structured JSON/CSV for offline analysis | | [Webhook](webhook.md) | `webhook` | HTTP endpoint forwarding | ## Plugin System diff --git a/website/docs/GCM_Monitoring/exporters/telemetry.md b/website/docs/GCM_Monitoring/exporters/telemetry.md deleted file mode 100644 index 6cb199a..0000000 --- a/website/docs/GCM_Monitoring/exporters/telemetry.md +++ /dev/null @@ -1,32 +0,0 @@ ---- -sidebar_position: 7 ---- - -# Telemetry - -The Telemetry exporter appends structured telemetry snapshots to a local file in JSON or CSV format for offline analysis. - -## Configuration - -| Option | Required | Description | -|--------|----------|-------------| -| `file_path` | Yes | Path to the output file | -| `format` | No | `json` (default) or `csv` | - -## Usage - -```shell -# JSON (NDJSON, one object per line) -gcm nvml_monitor --sink=telemetry --sink-opt file_path=/var/log/gcm/telemetry.json --once - -# CSV -gcm nvml_monitor --sink=telemetry --sink-opt file_path=/var/log/gcm/telemetry.csv --sink-opt format=csv --once -``` - -## Output - -Each snapshot adds a timestamp and writes one record per GPU. Example JSON: - -```json -{"timestamp": "2026-03-04T21:31:22", "hostname": "node-42", "gpu_id": 3, "job_id": 91283, "job_user": "research_team", "gpu_util": 88, "mem_used_percent": 71, "temperature": 78, "power_draw": 310, "retired_pages_count_single_bit": 0, "retired_pages_count_double_bit": 0} -``` From 961b5b72e3004e61b068673040d51b9f2864cae7 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Mon, 9 Mar 2026 02:42:04 -0700 Subject: [PATCH 08/15] feat: add CSV format support to file exporter - Add format option (json|csv) to file exporter - CSV output uses flattened scuba message structure - First write adds header row, subsequent writes append - Update docs for both Monitoring and Health Checks Made-with: Cursor --- gcm/exporters/file.py | 78 ++++++++++++++----- gcm/tests/test_file_exporter.py | 36 +++++++++ .../docs/GCM_Health_Checks/exporters/file.md | 1 + website/docs/GCM_Monitoring/exporters/file.md | 13 ++++ 4 files changed, 109 insertions(+), 19 deletions(-) create mode 100644 gcm/tests/test_file_exporter.py diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 3108046..d3a2448 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -1,24 +1,37 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. +from __future__ import annotations + +import csv import json +import logging import os from dataclasses import asdict -from typing import Callable, Optional, Tuple +from typing import Any, Callable, cast, Dict, Literal, Optional, Tuple, TYPE_CHECKING from gcm.exporters import register - +from gcm.monitoring.dataclass_utils import asdict_recursive from gcm.monitoring.meta_utils.scuba import to_scuba_message from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams - from gcm.monitoring.utils.monitor import init_logger from gcm.schemas.log import Log +if TYPE_CHECKING: + from _typeshed import DataclassInstance + split_path: Callable[[str], Tuple[str, str]] = lambda path: ( os.path.dirname(path), os.path.basename(path), ) +def _flatten_for_csv(payload: object) -> Dict[str, Any]: + """Flatten scuba message dict for CSV output.""" + scuba = asdict(to_scuba_message(cast("DataclassInstance", payload))) + flat = asdict_recursive(scuba) + return flat if isinstance(flat, dict) else {} + + @register("file") class File: """Write data to file.""" @@ -29,16 +42,23 @@ def __init__( file_path: Optional[str] = None, job_file_path: Optional[str] = None, node_file_path: Optional[str] = None, + format: Literal["json", "csv"] = "json", ): if all(path is None for path in [file_path, job_file_path, node_file_path]): raise Exception( "When using the file sink at least one file_path needs to be specified. See gcm %collector% --help" ) - self.data_identifier_to_logger_map = {} + self.format = format + self._csv_header_written: Dict[str, bool] = {} + self.data_identifier_to_logger_map: Dict[ + DataIdentifier, Optional[logging.Logger] + ] = {} + self._data_identifier_to_path: Dict[DataIdentifier, str] = {} if file_path is not None: file_directory, file_name = split_path(file_path) + self._data_identifier_to_path[DataIdentifier.GENERIC] = file_path self.data_identifier_to_logger_map[DataIdentifier.GENERIC], _ = init_logger( logger_name=__name__ + file_path, log_dir=file_directory, @@ -48,6 +68,7 @@ def __init__( if job_file_path is not None: file_directory, file_name = split_path(job_file_path) + self._data_identifier_to_path[DataIdentifier.JOB] = job_file_path self.data_identifier_to_logger_map[DataIdentifier.JOB], _ = init_logger( logger_name=__name__ + job_file_path, log_dir=file_directory, @@ -57,6 +78,7 @@ def __init__( if node_file_path is not None: file_directory, file_name = split_path(node_file_path) + self._data_identifier_to_path[DataIdentifier.NODE] = node_file_path self.data_identifier_to_logger_map[DataIdentifier.NODE], _ = init_logger( logger_name=__name__ + node_file_path, log_dir=file_directory, @@ -70,24 +92,42 @@ def write( additional_params: SinkAdditionalParams, ) -> None: - # update file path if data_identifier is present on additional_params - if additional_params.data_identifier: - data_identifier = additional_params.data_identifier - if data_identifier not in self.data_identifier_to_logger_map: - raise AssertionError( - f"data_identifier value is unsupported on file sink: {data_identifier}" - ) - if self.data_identifier_to_logger_map[data_identifier] is None: + data_identifier = additional_params.data_identifier or DataIdentifier.GENERIC + if data_identifier not in self.data_identifier_to_logger_map: + raise AssertionError( + f"data_identifier value is unsupported on file sink: {data_identifier}" + ) + if self.data_identifier_to_logger_map[data_identifier] is None: + raise AssertionError( + f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + ) + logger = self.data_identifier_to_logger_map[data_identifier] + assert logger is not None + + if self.format == "csv": + path = self._data_identifier_to_path.get(data_identifier) + if path is None: raise AssertionError( - f"The sink is missing a required param for the following data_identifier: {data_identifier}. See gcm %collector% --help" + "CSV format requires data_identifier to match a configured path" ) - logger = self.data_identifier_to_logger_map[data_identifier] + records = [_flatten_for_csv(p) for p in data.message] + if not records: + return + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + all_keys = sorted({k for r in records for k in r.keys()}) + header_done = self._csv_header_written.get(path, False) + with open(path, "a") as f: + w = csv.DictWriter(f, fieldnames=all_keys, extrasaction="ignore") + if not header_done: + w.writeheader() + self._csv_header_written[path] = True + w.writerows(records) + elif self.format == "json": + for payload in data.message: + # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter + logger.info(json.dumps(asdict(to_scuba_message(payload)))) else: - logger = self.data_identifier_to_logger_map[DataIdentifier.GENERIC] - - for payload in data.message: - # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter - logger.info(json.dumps(asdict(to_scuba_message(payload)))) + raise ValueError(f"Unsupported format: {self.format!r}") def shutdown(self) -> None: pass diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py new file mode 100644 index 0000000..bac6668 --- /dev/null +++ b/gcm/tests/test_file_exporter.py @@ -0,0 +1,36 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +import csv +from dataclasses import dataclass +from pathlib import Path + +from gcm.exporters.file import File +from gcm.monitoring.sink.protocol import DataIdentifier, SinkAdditionalParams +from gcm.schemas.log import Log + + +@dataclass +class SamplePayload: + job_id: int + state: str + user: str + + +def test_file_exporter_csv(tmp_path: Path) -> None: + path = tmp_path / "data.csv" + sink = File(file_path=str(path), format="csv") + sink.write( + Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + sink.write( + Log(ts=2000, message=[SamplePayload(job_id=2, state="PENDING", user="bob")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + with open(path) as f: + rows = list(csv.DictReader(f)) + assert len(rows) == 2 + assert rows[0]["normal.state"] == "RUNNING" + assert rows[0]["normal.user"] == "alice" + assert rows[1]["normal.state"] == "PENDING" + assert rows[1]["normal.user"] == "bob" diff --git a/website/docs/GCM_Health_Checks/exporters/file.md b/website/docs/GCM_Health_Checks/exporters/file.md index 8a8e122..38e7c87 100644 --- a/website/docs/GCM_Health_Checks/exporters/file.md +++ b/website/docs/GCM_Health_Checks/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage diff --git a/website/docs/GCM_Monitoring/exporters/file.md b/website/docs/GCM_Monitoring/exporters/file.md index 8a8e122..99f7ae7 100644 --- a/website/docs/GCM_Monitoring/exporters/file.md +++ b/website/docs/GCM_Monitoring/exporters/file.md @@ -13,6 +13,7 @@ The File exporter writes monitoring data and health check results to local file | Option | Required | Description | |--------|----------|-------------| | `file_path` | Yes | Path to the output file | +| `format` | No | `json` (default) or `csv` | ### Basic Usage @@ -36,6 +37,8 @@ sink_opts = [ ## Output Format +### JSON (default) + Data is written as newline-delimited JSON (NDJSON), with each monitoring event on a separate line: ```json @@ -48,6 +51,16 @@ This format allows for: - Line-by-line processing with standard tools - Easy parsing with JSON libraries +### CSV + +Use `format=csv` for comma-separated output suitable for spreadsheets and offline analysis: + +```shell +gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink-opt format=csv --once +``` + +The first write adds a header row; subsequent writes append data rows. + ## Use Cases ### Production Monitoring From 20ec9d8e7b49d83a85218d2befb427fdb04f69a1 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Tue, 10 Mar 2026 02:01:55 -0700 Subject: [PATCH 09/15] fixed the logic in file.py --- README.md | 2 +- gcm/exporters/file.py | 34 ++++++++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 7c8dec6..41b0ccd 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Facebook has adopted a Code of Conduct that we expect project participants to ad ## The Team -GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Gustavo Lima](https://github.com/gustcol). +GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Gustavo Lima](https://github.com/gustcol), [Achintya Paningapalli](https://github.com/theap06). Feel free to contribute and add your name! diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index d3a2448..a0898fc 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -3,6 +3,7 @@ from __future__ import annotations import csv +import io import json import logging import os @@ -50,11 +51,12 @@ def __init__( ) self.format = format - self._csv_header_written: Dict[str, bool] = {} self.data_identifier_to_logger_map: Dict[ DataIdentifier, Optional[logging.Logger] ] = {} self._data_identifier_to_path: Dict[DataIdentifier, str] = {} + if self.format == "csv": + self._csv_header_written: Dict[str, bool] = {} if file_path is not None: file_directory, file_name = split_path(file_path) @@ -113,15 +115,31 @@ def write( records = [_flatten_for_csv(p) for p in data.message] if not records: return - os.makedirs(os.path.dirname(path) or ".", exist_ok=True) all_keys = sorted({k for r in records for k in r.keys()}) header_done = self._csv_header_written.get(path, False) - with open(path, "a") as f: - w = csv.DictWriter(f, fieldnames=all_keys, extrasaction="ignore") - if not header_done: - w.writeheader() - self._csv_header_written[path] = True - w.writerows(records) + + if not header_done: + header_buf = io.StringIO() + header_writer = csv.DictWriter( + header_buf, + fieldnames=all_keys, + extrasaction="ignore", + lineterminator="", + ) + header_writer.writeheader() + logger.info(header_buf.getvalue()) + self._csv_header_written[path] = True + + for record in records: + row_buf = io.StringIO() + row_writer = csv.DictWriter( + row_buf, + fieldnames=all_keys, + extrasaction="ignore", + lineterminator="", + ) + row_writer.writerow(record) + logger.info(row_buf.getvalue()) elif self.format == "json": for payload in data.message: # TODO: remove to_scuba_message once slurm_job_monitor migrates to OpenTelemetry exporter From f659f0a7f64aaa98f311596bda5867b29727e755 Mon Sep 17 00:00:00 2001 From: theap06 Date: Wed, 11 Mar 2026 12:02:11 -0700 Subject: [PATCH 10/15] Update gcm/exporters/file.py Co-authored-by: lucca bertoncini <32229669+luccabb@users.noreply.github.com> --- gcm/exporters/file.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index a0898fc..1bb9e06 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -28,8 +28,7 @@ def _flatten_for_csv(payload: object) -> Dict[str, Any]: """Flatten scuba message dict for CSV output.""" - scuba = asdict(to_scuba_message(cast("DataclassInstance", payload))) - flat = asdict_recursive(scuba) + flat = asdict_recursive(to_scuba_message(cast("DataclassInstance", payload))) return flat if isinstance(flat, dict) else {} From c8ec6a2b29f3274bdf69f01214c04f055cc2bdf3 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 11 Mar 2026 14:31:38 -0700 Subject: [PATCH 11/15] fixed the issues with style checker --- dev-requirements.txt | 2 ++ gcm/exporters/file.py | 9 +++++---- gcm/tests/test_file_exporter.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index e82d44b..f0af383 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -183,6 +183,7 @@ keyring==25.7.0 \ --hash=sha256:fe01bd85eb3f8fb3dd0405defdeac9a5b4f6f0439edbb3149577f244a2e8245b # via twine libcst==1.8.1 \ + --hash=sha256:a748502a2ef57834e7f135a51392dc6a431f2d4547f8b2917cd8e27c91d30c61 \ --hash=sha256:423427819409a1d905017bbd51062bd0f1e4795c74c2f9f52a6b63dd67c282d2 \ --hash=sha256:bdad73ce302741354abd2d0ac54add8bbbffb123a176629f65ce16e0dff012f6 # via @@ -374,6 +375,7 @@ pynvml==11.4.1 \ --hash=sha256:d27be542cd9d06558de18e2deffc8022ccd7355bc7382255d477038e7e424c6c # via gcm (pyproject.toml) pyoxidizer==0.24.0 \ + --hash=sha256:1a9940d2bdb6c9e6c6c45eb4de3d4d6e5c9b3a3724dbc7d774d85d4956058447 \ --hash=sha256:ec56f2b99495aa0178e927389a3e151e9669beae4e2bce3f6897fb9891b5502e # via gcm (pyproject.toml) pyproject-hooks==1.2.0 \ diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 1bb9e06..bec390a 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -55,7 +55,7 @@ def __init__( ] = {} self._data_identifier_to_path: Dict[DataIdentifier, str] = {} if self.format == "csv": - self._csv_header_written: Dict[str, bool] = {} + self._csv_fieldnames: Dict[str, Tuple[str, ...]] = {} if file_path is not None: file_directory, file_name = split_path(file_path) @@ -115,9 +115,10 @@ def write( if not records: return all_keys = sorted({k for r in records for k in r.keys()}) - header_done = self._csv_header_written.get(path, False) + fieldnames = tuple(all_keys) + previous_fieldnames = self._csv_fieldnames.get(path) - if not header_done: + if previous_fieldnames != fieldnames: header_buf = io.StringIO() header_writer = csv.DictWriter( header_buf, @@ -127,7 +128,7 @@ def write( ) header_writer.writeheader() logger.info(header_buf.getvalue()) - self._csv_header_written[path] = True + self._csv_fieldnames[path] = fieldnames for record in records: row_buf = io.StringIO() diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py index bac6668..79dcd00 100644 --- a/gcm/tests/test_file_exporter.py +++ b/gcm/tests/test_file_exporter.py @@ -16,6 +16,12 @@ class SamplePayload: user: str +@dataclass +class OtherSamplePayload: + gpu_uuid: str + memory_used_mb: int + + def test_file_exporter_csv(tmp_path: Path) -> None: path = tmp_path / "data.csv" sink = File(file_path=str(path), format="csv") @@ -34,3 +40,28 @@ def test_file_exporter_csv(tmp_path: Path) -> None: assert rows[0]["normal.user"] == "alice" assert rows[1]["normal.state"] == "PENDING" assert rows[1]["normal.user"] == "bob" + + +def test_file_exporter_csv_rewrites_header_on_schema_change(tmp_path: Path) -> None: + path = tmp_path / "data.csv" + sink = File(file_path=str(path), format="csv") + + sink.write( + Log(ts=1000, message=[SamplePayload(job_id=1, state="RUNNING", user="alice")]), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + sink.write( + Log( + ts=2000, + message=[OtherSamplePayload(gpu_uuid="GPU-123", memory_used_mb=2048)], + ), + SinkAdditionalParams(data_identifier=DataIdentifier.GENERIC), + ) + + lines = path.read_text().splitlines() + assert lines == [ + "normal.job_id,normal.state,normal.user", + "1,RUNNING,alice", + "normal.gpu_uuid,normal.memory_used_mb", + "GPU-123,2048", + ] From 67af3df9f7f952bd1cfe2e6526e4c3e8cbe6414a Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 11 Mar 2026 16:27:44 -0700 Subject: [PATCH 12/15] Clean PR scope and clarify CSV docs --- README.md | 2 +- dev-requirements.txt | 2 -- website/docs/GCM_Monitoring/exporters/file.md | 3 ++- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 41b0ccd..7c8dec6 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Facebook has adopted a Code of Conduct that we expect project participants to ad ## The Team -GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Gustavo Lima](https://github.com/gustcol), [Achintya Paningapalli](https://github.com/theap06). +GPU Cluster Monitoring is actively maintained by [Lucca Bertoncini](https://github.com/luccabb), [Caleb Ho](https://github.com/calebho), [Apostolos Kokolis](https://github.com/A-Kokolis), [Liao Hu](https://github.com/L1A0), [Thanh Nguyen](https://github.com/giongto35), [Billy Campoli](https://github.com/tooji) with a number of contributions coming from talented individuals (in no particular order, and non-exhaustive): [Jörg Doku](https://github.com/Jorghi12), [Vivian Peng](https://github.com/vzpeng), [Parth Malani](https://github.com/pmmalani), [Kalyan Saladi](https://github.com/skalyan), [Shubho Sengupta](https://github.com/shubho), [Leo Huang](https://github.com/lifeihuang), [Robert Vincent](https://github.com/bvincent-penguin), [Max Wang](https://github.com/mxw), [Sujit Verma](https://github.com/sujitoc), [Teng Li](https://github.com/teng-li), [James Taylor](https://github.com/jamestaylr), [Xiaodong Ma](https://github.com/xman1979), [Chris Henry](https://github.com/chenry3), [Jakob Johnson](https://github.com/jj10306), [Kareem Sakher](https://github.com/kjsakher), [Abinesh Ramakrishnan](https://github.com/ibanesh), [Nabib Ahmed](https://github.com/nahmed3536), [Yong Li](https://github.com/yonglimeta), [Junjie Qian](https://github.com/junjieqian), [David Watson](https://github.com/davidewatson), [Guanyu Wu](https://github.com/kwu-penguin), [Jaromir Latal](https://github.com/jermenkoo), [Samuel Doud](https://github.com/SamuelDoud), [Yidi Wu](https://github.com/ydwu4), [Xinyuan Zhang](https://github.com/xinyuanzzz), [Neha Saxena](https://github.com/nehasaxena210), [Gustavo Lima](https://github.com/gustcol). Feel free to contribute and add your name! diff --git a/dev-requirements.txt b/dev-requirements.txt index f0af383..e82d44b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -183,7 +183,6 @@ keyring==25.7.0 \ --hash=sha256:fe01bd85eb3f8fb3dd0405defdeac9a5b4f6f0439edbb3149577f244a2e8245b # via twine libcst==1.8.1 \ - --hash=sha256:a748502a2ef57834e7f135a51392dc6a431f2d4547f8b2917cd8e27c91d30c61 \ --hash=sha256:423427819409a1d905017bbd51062bd0f1e4795c74c2f9f52a6b63dd67c282d2 \ --hash=sha256:bdad73ce302741354abd2d0ac54add8bbbffb123a176629f65ce16e0dff012f6 # via @@ -375,7 +374,6 @@ pynvml==11.4.1 \ --hash=sha256:d27be542cd9d06558de18e2deffc8022ccd7355bc7382255d477038e7e424c6c # via gcm (pyproject.toml) pyoxidizer==0.24.0 \ - --hash=sha256:1a9940d2bdb6c9e6c6c45eb4de3d4d6e5c9b3a3724dbc7d774d85d4956058447 \ --hash=sha256:ec56f2b99495aa0178e927389a3e151e9669beae4e2bce3f6897fb9891b5502e # via gcm (pyproject.toml) pyproject-hooks==1.2.0 \ diff --git a/website/docs/GCM_Monitoring/exporters/file.md b/website/docs/GCM_Monitoring/exporters/file.md index 99f7ae7..d489567 100644 --- a/website/docs/GCM_Monitoring/exporters/file.md +++ b/website/docs/GCM_Monitoring/exporters/file.md @@ -59,7 +59,8 @@ Use `format=csv` for comma-separated output suitable for spreadsheets and offlin gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink-opt format=csv --once ``` -The first write adds a header row; subsequent writes append data rows. +The first write adds a header row; subsequent writes append data rows. If a later +payload has a different schema, a new header row is written before those rows. ## Use Cases From 489555d3e120f8c1c3f98847444d45dfc075eef9 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 11 Mar 2026 16:41:48 -0700 Subject: [PATCH 13/15] Fix file exporter CSV test expectations --- dev-requirements.txt | 2 ++ gcm/tests/test_file_exporter.py | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index e82d44b..f0af383 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -183,6 +183,7 @@ keyring==25.7.0 \ --hash=sha256:fe01bd85eb3f8fb3dd0405defdeac9a5b4f6f0439edbb3149577f244a2e8245b # via twine libcst==1.8.1 \ + --hash=sha256:a748502a2ef57834e7f135a51392dc6a431f2d4547f8b2917cd8e27c91d30c61 \ --hash=sha256:423427819409a1d905017bbd51062bd0f1e4795c74c2f9f52a6b63dd67c282d2 \ --hash=sha256:bdad73ce302741354abd2d0ac54add8bbbffb123a176629f65ce16e0dff012f6 # via @@ -374,6 +375,7 @@ pynvml==11.4.1 \ --hash=sha256:d27be542cd9d06558de18e2deffc8022ccd7355bc7382255d477038e7e424c6c # via gcm (pyproject.toml) pyoxidizer==0.24.0 \ + --hash=sha256:1a9940d2bdb6c9e6c6c45eb4de3d4d6e5c9b3a3724dbc7d774d85d4956058447 \ --hash=sha256:ec56f2b99495aa0178e927389a3e151e9669beae4e2bce3f6897fb9891b5502e # via gcm (pyproject.toml) pyproject-hooks==1.2.0 \ diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py index 79dcd00..90ebbfd 100644 --- a/gcm/tests/test_file_exporter.py +++ b/gcm/tests/test_file_exporter.py @@ -36,10 +36,10 @@ def test_file_exporter_csv(tmp_path: Path) -> None: with open(path) as f: rows = list(csv.DictReader(f)) assert len(rows) == 2 - assert rows[0]["normal.state"] == "RUNNING" - assert rows[0]["normal.user"] == "alice" - assert rows[1]["normal.state"] == "PENDING" - assert rows[1]["normal.user"] == "bob" + assert rows[0]["state"] == "RUNNING" + assert rows[0]["user"] == "alice" + assert rows[1]["state"] == "PENDING" + assert rows[1]["user"] == "bob" def test_file_exporter_csv_rewrites_header_on_schema_change(tmp_path: Path) -> None: @@ -60,8 +60,8 @@ def test_file_exporter_csv_rewrites_header_on_schema_change(tmp_path: Path) -> N lines = path.read_text().splitlines() assert lines == [ - "normal.job_id,normal.state,normal.user", + "job_id,state,user", "1,RUNNING,alice", - "normal.gpu_uuid,normal.memory_used_mb", + "gpu_uuid,memory_used_mb", "GPU-123,2048", ] From a151464374412f5c921a024b6ab4d3ff539b728d Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 11 Mar 2026 22:31:20 -0700 Subject: [PATCH 14/15] Roll CSV output to new file on schema change --- gcm/exporters/file.py | 40 +++++++++++++++++++ gcm/tests/test_file_exporter.py | 7 +++- website/docs/GCM_Monitoring/exporters/file.md | 3 +- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index bec390a..52ad910 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -26,6 +26,13 @@ ) +def _schema_versioned_path(path: str, schema_index: int) -> str: + if schema_index == 0: + return path + stem, ext = os.path.splitext(path) + return f"{stem}_{schema_index}{ext}" + + def _flatten_for_csv(payload: object) -> Dict[str, Any]: """Flatten scuba message dict for CSV output.""" flat = asdict_recursive(to_scuba_message(cast("DataclassInstance", payload))) @@ -54,12 +61,17 @@ def __init__( DataIdentifier, Optional[logging.Logger] ] = {} self._data_identifier_to_path: Dict[DataIdentifier, str] = {} + self._data_identifier_to_base_path: Dict[DataIdentifier, str] = {} if self.format == "csv": self._csv_fieldnames: Dict[str, Tuple[str, ...]] = {} + self._csv_schema_index: Dict[DataIdentifier, int] = {} if file_path is not None: file_directory, file_name = split_path(file_path) self._data_identifier_to_path[DataIdentifier.GENERIC] = file_path + self._data_identifier_to_base_path[DataIdentifier.GENERIC] = file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.GENERIC] = 0 self.data_identifier_to_logger_map[DataIdentifier.GENERIC], _ = init_logger( logger_name=__name__ + file_path, log_dir=file_directory, @@ -70,6 +82,9 @@ def __init__( if job_file_path is not None: file_directory, file_name = split_path(job_file_path) self._data_identifier_to_path[DataIdentifier.JOB] = job_file_path + self._data_identifier_to_base_path[DataIdentifier.JOB] = job_file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.JOB] = 0 self.data_identifier_to_logger_map[DataIdentifier.JOB], _ = init_logger( logger_name=__name__ + job_file_path, log_dir=file_directory, @@ -80,6 +95,9 @@ def __init__( if node_file_path is not None: file_directory, file_name = split_path(node_file_path) self._data_identifier_to_path[DataIdentifier.NODE] = node_file_path + self._data_identifier_to_base_path[DataIdentifier.NODE] = node_file_path + if self.format == "csv": + self._csv_schema_index[DataIdentifier.NODE] = 0 self.data_identifier_to_logger_map[DataIdentifier.NODE], _ = init_logger( logger_name=__name__ + node_file_path, log_dir=file_directory, @@ -118,6 +136,28 @@ def write( fieldnames = tuple(all_keys) previous_fieldnames = self._csv_fieldnames.get(path) + if ( + previous_fieldnames is not None + and previous_fieldnames != fieldnames + and data_identifier in self._csv_schema_index + ): + next_schema_index = self._csv_schema_index[data_identifier] + 1 + self._csv_schema_index[data_identifier] = next_schema_index + + base_path = self._data_identifier_to_base_path[data_identifier] + path = _schema_versioned_path(base_path, next_schema_index) + self._data_identifier_to_path[data_identifier] = path + + file_directory, file_name = split_path(path) + logger, _ = init_logger( + logger_name=__name__ + path, + log_dir=file_directory, + log_name=file_name, + log_formatter=None, + ) + self.data_identifier_to_logger_map[data_identifier] = logger + previous_fieldnames = self._csv_fieldnames.get(path) + if previous_fieldnames != fieldnames: header_buf = io.StringIO() header_writer = csv.DictWriter( diff --git a/gcm/tests/test_file_exporter.py b/gcm/tests/test_file_exporter.py index 90ebbfd..d19163a 100644 --- a/gcm/tests/test_file_exporter.py +++ b/gcm/tests/test_file_exporter.py @@ -42,7 +42,7 @@ def test_file_exporter_csv(tmp_path: Path) -> None: assert rows[1]["user"] == "bob" -def test_file_exporter_csv_rewrites_header_on_schema_change(tmp_path: Path) -> None: +def test_file_exporter_csv_rolls_over_on_schema_change(tmp_path: Path) -> None: path = tmp_path / "data.csv" sink = File(file_path=str(path), format="csv") @@ -62,6 +62,11 @@ def test_file_exporter_csv_rewrites_header_on_schema_change(tmp_path: Path) -> N assert lines == [ "job_id,state,user", "1,RUNNING,alice", + ] + + path_rollover = tmp_path / "data_1.csv" + rollover_lines = path_rollover.read_text().splitlines() + assert rollover_lines == [ "gpu_uuid,memory_used_mb", "GPU-123,2048", ] diff --git a/website/docs/GCM_Monitoring/exporters/file.md b/website/docs/GCM_Monitoring/exporters/file.md index d489567..edd5b02 100644 --- a/website/docs/GCM_Monitoring/exporters/file.md +++ b/website/docs/GCM_Monitoring/exporters/file.md @@ -60,7 +60,8 @@ gcm slurm_monitor --sink=file --sink-opt file_path=/var/log/gcm/data.csv --sink- ``` The first write adds a header row; subsequent writes append data rows. If a later -payload has a different schema, a new header row is written before those rows. +payload has a different schema, the exporter creates a new file with a numeric +suffix (for example, `data_1.csv`) and writes the new header there. ## Use Cases From a2dec03a33fc58d77f5f33e3ac73874426f41ce5 Mon Sep 17 00:00:00 2001 From: Achintya P Date: Wed, 11 Mar 2026 22:55:40 -0700 Subject: [PATCH 15/15] Reuse CSV writer buffer per batch --- gcm/exporters/file.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/gcm/exporters/file.py b/gcm/exporters/file.py index 52ad910..1f97c89 100644 --- a/gcm/exporters/file.py +++ b/gcm/exporters/file.py @@ -170,14 +170,16 @@ def write( logger.info(header_buf.getvalue()) self._csv_fieldnames[path] = fieldnames + row_buf = io.StringIO() + row_writer = csv.DictWriter( + row_buf, + fieldnames=all_keys, + extrasaction="ignore", + lineterminator="", + ) for record in records: - row_buf = io.StringIO() - row_writer = csv.DictWriter( - row_buf, - fieldnames=all_keys, - extrasaction="ignore", - lineterminator="", - ) + row_buf.seek(0) + row_buf.truncate(0) row_writer.writerow(record) logger.info(row_buf.getvalue()) elif self.format == "json":