diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py new file mode 100644 index 000000000..9d8b3ab09 --- /dev/null +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -0,0 +1,1150 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. +# +# See LICENSE for license information. +############################################################################### + +""" +MLflow Artifact Logging Utilities with TraceLens Integration + +This module provides functions to upload trace files, log files, and +TraceLens analysis reports to MLflow when MLflow tracking is enabled. + +Features: +- Upload profiler trace files from all profiled ranks (including multi-node) +- Upload log files from all levels and all ranks +- Generate and upload TraceLens trace analysis reports +- Supports both local and distributed training scenarios + +MLflow Artifact Structure: + artifacts/ + ├── traces/ # PyTorch profiler trace files + │ ├── rank_0_step_2.json.gz + │ └── ... + ├── logs/ # Training log files + │ └── log_mp_pretrain.txt + └── trace_analysis/ # TraceLens analysis reports + ├── rank_0_analysis.xlsx # Multi-tab Excel (default) + └── ... + +TraceLens Report Formats: + - xlsx: Multi-tab Excel (default; single parse, fastest) + - csv: Directory of CSV files per rank (kernels, memory, communication, etc.) + - all: Both xlsx and csv (parses trace twice, ~2x processing time; use when both formats needed) +""" + +import glob +import os +import re +import subprocess +import sys +from typing import List, Optional + +from primus.modules.module_utils import log_rank_0, warning_rank_0 + +# Pinned to immutable commit SHA for supply-chain safety (tags can be moved). +# This corresponds to tag v0.4.0 in AMD-AGI/TraceLens. +TRACELENS_INSTALL_REF = "0cba6840e20bf3bda74f26bed27a3497017101e6" + + +def _get_all_trace_files(tensorboard_dir: str) -> list: + """ + Find all profiler trace files in the tensorboard directory. + + Trace files are typically named like: + - *.pt.trace.json + - *.pt.trace.json.gz + + Args: + tensorboard_dir: Path to the tensorboard directory containing trace files + + Returns: + List of paths to trace files + """ + if not tensorboard_dir or not os.path.exists(tensorboard_dir): + return [] + + trace_files = [] + # Look for PyTorch profiler trace files (both compressed and uncompressed) + # Using specific patterns to avoid matching unrelated JSON files + patterns = ["*.pt.trace.json", "*.pt.trace.json.gz"] + # Escape directory path to handle special characters like [] in experiment names + escaped_dir = glob.escape(tensorboard_dir) + for pattern in patterns: + trace_files.extend(glob.glob(os.path.join(escaped_dir, "**", pattern), recursive=True)) + + # Remove duplicates while preserving order + seen = set() + unique_files = [] + for f in trace_files: + if f not in seen: + seen.add(f) + unique_files.append(f) + + return unique_files + + +def _get_all_log_files(exp_root_path: str) -> list: + """ + Find all log files in the experiment logs directory. + + Log files are organized as: + - {exp_root_path}/logs/master/master-*.log + - {exp_root_path}/logs/{module_name}/rank-{rank}/*.log + + Args: + exp_root_path: Root path of the experiment + + Returns: + List of paths to log files + """ + if not exp_root_path: + return [] + + logs_dir = os.path.join(exp_root_path, "logs") + if not os.path.exists(logs_dir): + return [] + + log_files = [] + # Find all .log files recursively (escape path to handle special characters) + log_files.extend(glob.glob(os.path.join(glob.escape(logs_dir), "**", "*.log"), recursive=True)) + + return log_files + + +def upload_trace_files_to_mlflow( + mlflow_writer, + tensorboard_dir: str, + artifact_path: str = "traces", +) -> int: + """ + Upload all profiler trace files to MLflow as artifacts. + + This function collects trace files from the tensorboard directory and + uploads them to MLflow. In distributed settings, only the last rank + (where the MLflow writer is initialized) should call this. + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + tensorboard_dir: Path to the tensorboard directory containing trace files + artifact_path: MLflow artifact subdirectory for trace files + + Returns: + Number of trace files uploaded + """ + if mlflow_writer is None: + return 0 + + log_rank_0(f"[MLflow] Searching for trace files in: {tensorboard_dir}") + trace_files = _get_all_trace_files(tensorboard_dir) + if len(trace_files) > 5: + log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files[:5]}...") + else: + log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files}") + + if not trace_files: + log_rank_0("[MLflow] No trace files found to upload") + return 0 + + uploaded_count = 0 + for trace_file in trace_files: + try: + # Get relative path from tensorboard_dir for artifact organization + rel_path = os.path.relpath(trace_file, tensorboard_dir) + # Determine artifact subdirectory based on file location + artifact_subpath = ( + os.path.join(artifact_path, os.path.dirname(rel_path)) + if os.path.dirname(rel_path) + else artifact_path + ) + + mlflow_writer.log_artifact(trace_file, artifact_path=artifact_subpath) + uploaded_count += 1 + log_rank_0(f"[MLflow] Uploaded trace file: {os.path.basename(trace_file)}") + except Exception as e: + warning_rank_0(f"[MLflow] Failed to upload trace file {trace_file}: {e}") + + log_rank_0(f"[MLflow] Uploaded {uploaded_count} trace files to '{artifact_path}'") + return uploaded_count + + +def upload_log_files_to_mlflow( + mlflow_writer, + exp_root_path: str, + artifact_path: str = "logs", +) -> int: + """ + Upload all log files to MLflow as artifacts. + + This function collects log files from all ranks and all log levels + and uploads them to MLflow. The directory structure is preserved + in the artifact path. + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + exp_root_path: Root path of the experiment + artifact_path: MLflow artifact subdirectory for log files + + Returns: + Number of log files uploaded + """ + if mlflow_writer is None: + return 0 + + log_files = _get_all_log_files(exp_root_path) + + if not log_files: + log_rank_0("[MLflow] No log files found to upload") + return 0 + + logs_base_dir = os.path.join(exp_root_path, "logs") + uploaded_count = 0 + + for log_file in log_files: + try: + # Preserve directory structure relative to logs base directory + rel_path = os.path.relpath(log_file, logs_base_dir) + artifact_subpath = ( + os.path.join(artifact_path, os.path.dirname(rel_path)) + if os.path.dirname(rel_path) + else artifact_path + ) + + mlflow_writer.log_artifact(log_file, artifact_path=artifact_subpath) + uploaded_count += 1 + except Exception as e: + warning_rank_0(f"[MLflow] Failed to upload log file {log_file}: {e}") + + log_rank_0(f"[MLflow] Uploaded {uploaded_count} log files to '{artifact_path}'") + return uploaded_count + + +# ============================================================================= +# TraceLens Integration +# ============================================================================= + + +def _ensure_openpyxl_installed(auto_install: bool = True) -> bool: + """ + Ensure openpyxl is installed for XLSX generation. + + Returns: + True if openpyxl is available, False otherwise + """ + try: + import openpyxl # noqa: F401 + + return True + except ImportError: + if not auto_install: + warning_rank_0("[TraceLens] openpyxl not installed and auto-install disabled; skipping install.") + return False + log_rank_0("[TraceLens] openpyxl not found, installing for XLSX support...") + try: + subprocess.run( + [sys.executable, "-m", "pip", "install", "openpyxl", "-q"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + timeout=300, + ) + log_rank_0("[TraceLens] Successfully installed openpyxl") + return True + except subprocess.TimeoutExpired: + warning_rank_0("[TraceLens] openpyxl install timed out after 300s. Skipping install.") + return False + except subprocess.CalledProcessError as e: + stdout_output = e.stdout.strip() if e.stdout else "No stdout output captured." + stderr_output = e.stderr.strip() if e.stderr else "No stderr output captured." + warning_rank_0( + f"[TraceLens] Failed to install openpyxl: {e}\n" + f"[TraceLens] pip stdout: {stdout_output}\n" + f"[TraceLens] pip stderr: {stderr_output}" + ) + return False + + +def _verify_tracelens_ref_exists(ref: str) -> bool: + """ + Verify that the TraceLens git reference exists before installing. + + Returns: + True if the ref exists or verification is skipped, False otherwise + """ + is_commit_sha = bool(re.fullmatch(r"[0-9a-fA-F]{7,40}", ref)) + try: + ls_remote_cmd = ["git", "ls-remote", "https://github.com/AMD-AGI/TraceLens.git"] + if not is_commit_sha: + ls_remote_cmd.append(ref) + result = subprocess.run( + ls_remote_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + timeout=10, + ) + except FileNotFoundError: + warning_rank_0("[TraceLens] git not found; skipping TraceLens ref verification.") + return True + except subprocess.TimeoutExpired: + warning_rank_0("[TraceLens] TraceLens ref verification timed out.") + return False + except subprocess.CalledProcessError as e: + stderr_output = e.stderr.strip() if e.stderr else "No stderr output captured." + warning_rank_0( + f"[TraceLens] TraceLens ref verification failed: {e}\n" f"[TraceLens] git stderr: {stderr_output}" + ) + return False + + output = result.stdout.strip() + if not output: + warning_rank_0(f"[TraceLens] TraceLens ref '{ref}' not found; skipping install.") + return False + if is_commit_sha: + sha_lower = ref.lower() + if not any(line.lower().startswith(sha_lower) for line in output.splitlines()): + warning_rank_0(f"[TraceLens] TraceLens SHA '{ref}' not found; skipping install.") + return False + + return True + + +def _ensure_tracelens_installed(auto_install: bool = True) -> bool: + """ + Ensure TraceLens and its dependencies are installed. + + TraceLens is available from GitHub: https://github.com/AMD-AGI/TraceLens + XLSX generation requires openpyxl which is installed separately. + + Returns: + True if TraceLens is available, False otherwise + """ + try: + import TraceLens # noqa: F401 + + log_rank_0("[TraceLens] TraceLens is already installed") + except ImportError: + if not auto_install: + warning_rank_0("[TraceLens] TraceLens not installed and auto-install disabled.") + return False + log_rank_0("[TraceLens] TraceLens not found, attempting to install from GitHub...") + try: + # TraceLens is on GitHub, not PyPI; pin to a commit SHA for reproducibility and supply-chain safety + install_spec = f"git+https://github.com/AMD-AGI/TraceLens.git@{TRACELENS_INSTALL_REF}" + if not _verify_tracelens_ref_exists(TRACELENS_INSTALL_REF): + return False + subprocess.run( + [ + sys.executable, + "-m", + "pip", + "install", + install_spec, + "-q", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + timeout=300, + ) + log_rank_0( + f"[TraceLens] Successfully installed TraceLens from GitHub (ref={TRACELENS_INSTALL_REF})" + ) + try: + import TraceLens # noqa: F401 + except ImportError: + warning_rank_0( + "[TraceLens] TraceLens install completed but import failed. " "A restart may be required." + ) + return False + except subprocess.TimeoutExpired: + warning_rank_0("[TraceLens] TraceLens install timed out after 300s. Skipping install.") + return False + except subprocess.CalledProcessError as e: + stdout_output = e.stdout.strip() if e.stdout else "No stdout output captured." + stderr_output = e.stderr.strip() if e.stderr else "No stderr output captured." + warning_rank_0( + f"[TraceLens] Failed to install TraceLens: {e}\n" + f"[TraceLens] pip stdout: {stdout_output}\n" + f"[TraceLens] pip stderr: {stderr_output}" + ) + return False + + return True + + +def _extract_rank_from_filename(filename: str) -> Optional[int]: + """ + Extract rank number from trace filename. + + Expected patterns: + - rank_0_step_2.json.gz, rank_15_step_1.pt.trace.json (rank_N_) + - rank_0.pt.trace.json, rank_0.pt.trace.json.gz (rank_N. with dot after rank) + - primus-megatron-exp-rank[0].*.json (rank[N], -rankN., _rankN.) + + Args: + filename: The trace filename + + Returns: + Rank number or None if not found + """ + # Try pattern: rank_N_, rank_N. (dot), rank[N], -rankN., _rankN. + patterns = [ + r"rank_(\d+)_", + r"rank_(\d+)\.", # e.g. rank_0.pt.trace.json.gz + r"rank\[(\d+)\]", + r"-rank(\d+)\.", + r"_rank(\d+)\.", + ] + + for pattern in patterns: + match = re.search(pattern, filename) + if match: + return int(match.group(1)) + + return None + + +def _normalize_tracelens_ranks(ranks: Optional[List[int]]) -> Optional[List[int]]: + """Normalize and validate TraceLens rank filters.""" + if ranks is None: + return None + + if isinstance(ranks, str): + import ast + + try: + ranks = ast.literal_eval(ranks) + except (ValueError, SyntaxError) as e: + warning_rank_0(f"[TraceLens] Failed to parse ranks '{ranks}': {e}. Disabling rank filter.") + return None + + if not isinstance(ranks, list): + warning_rank_0( + f"[TraceLens] Ranks evaluated to {type(ranks).__name__}, expected list. Disabling rank filter." + ) + return None + + normalized = [] + invalid = [] + for rank in ranks: + if isinstance(rank, bool): + invalid.append(rank) + continue + try: + rank_int = int(rank) + except (TypeError, ValueError): + invalid.append(rank) + continue + if rank_int < 0: + invalid.append(rank) + continue + normalized.append(rank_int) + + if invalid: + warning_rank_0("[TraceLens] Ignoring invalid ranks: " + ", ".join(str(rank) for rank in invalid)) + + if not normalized: + warning_rank_0("[TraceLens] No valid ranks provided after validation.") + return [] + + try: + world_size = int(os.environ.get("WORLD_SIZE", os.environ.get("SLURM_NTASKS", "0"))) + except ValueError: + world_size = 0 + + if world_size > 0: + out_of_range = [rank for rank in normalized if rank >= world_size] + if out_of_range: + warning_rank_0(f"[TraceLens] Ignoring ranks outside world_size={world_size}: {out_of_range}") + normalized = [rank for rank in normalized if rank < world_size] + if not normalized: + warning_rank_0("[TraceLens] No valid ranks remain after world_size filtering.") + return [] + + return sorted(set(normalized)) + + +def _normalize_tracelens_output_format(output_format: str) -> str: + """Normalize and validate TraceLens output format.""" + if output_format is None: + warning_rank_0("[TraceLens] output_format is None; defaulting to 'xlsx'.") + return "xlsx" + + normalized = str(output_format).strip().lower() + if normalized in ("xlsx", "csv", "all"): + return normalized + + warning_rank_0( + f"[TraceLens] Invalid output_format '{output_format}'; " + "expected 'xlsx', 'csv', or 'all'. Defaulting to 'xlsx'." + ) + return "xlsx" + + +def _filter_traces_by_rank(trace_files: List[str], ranks: List[int]) -> List[str]: + """ + Filter trace files to only include specified ranks. + + Args: + trace_files: List of trace file paths + ranks: List of rank numbers to include + + Returns: + Filtered list of trace files + """ + if ranks is None: + return trace_files + if not ranks: + return [] + + filtered = [] + for trace_file in trace_files: + rank = _extract_rank_from_filename(os.path.basename(trace_file)) + if rank is not None and rank in ranks: + filtered.append(trace_file) + + return filtered + + +def generate_tracelens_report( + trace_file: str, + output_dir: str, + report_name: Optional[str] = None, + output_format: str = "xlsx", + auto_install: bool = True, +) -> List[str]: + """ + Generate a TraceLens analysis report for a single trace file. + + Args: + trace_file: Path to the PyTorch profiler trace file (JSON/JSON.GZ) + output_dir: Directory to save the report + report_name: Optional custom name for the report (base name for CSVs) + output_format: Output format: + - "xlsx" (default): Single multi-tab Excel; one trace parse, fastest. + - "csv": Multiple CSV files (kernels, memory, communication, etc.) + saved under {output_dir}/{report_name}/*.csv. + - "all": Both XLSX and CSV; trace is parsed twice (~2x processing time). + XLSX: {output_dir}/{report_name}_analysis.xlsx + CSVs: {output_dir}/{report_name}/*.csv + Prefer "xlsx" or "csv" to avoid this overhead unless both are needed. + + Returns: + List of paths to generated report files + """ + if not os.path.exists(trace_file): + warning_rank_0(f"[TraceLens] Trace file not found: {trace_file}") + return [] + + output_format = _normalize_tracelens_output_format(output_format) + + os.makedirs(output_dir, exist_ok=True) + + # Generate base name from trace filename if not provided + if report_name is None: + base_name = os.path.basename(trace_file) + # Remove extensions like .json.gz (check most specific first so e.g. rank_0.pt.trace.json.gz -> rank_0) + for trace_ext in [".pt.trace.json.gz", ".pt.trace.json", ".json.gz", ".json"]: + if base_name.endswith(trace_ext): + base_name = base_name[: -len(trace_ext)] + break + report_name = base_name + + try: + # Try using TraceLens Python API directly + from TraceLens.Reporting import generate_perf_report_pytorch + + # Only ensure openpyxl when XLSX output is requested (avoids pip install in CSV-only or restricted envs) + if output_format in ("xlsx", "all"): + if not _ensure_openpyxl_installed(auto_install=auto_install): + warning_rank_0("[TraceLens] openpyxl unavailable; downgrading output_format to 'csv'.") + output_format = "csv" + + generated_files = [] + + # For "all" format: TraceLens uses either/or logic - if output_csvs_dir is set, + # it ONLY generates CSVs. So we need to call it twice for both formats. + # Performance: trace file is parsed twice intentionally (~2x time; large traces can be hundreds of MB). + # A future workaround could write CSVs from the DataFrames returned by the first call + # if TraceLens API exposes a suitable export; for now we accept the double parse. + if output_format == "all": + warning_rank_0( + "[TraceLens] output_format='all' parses the trace file twice (~2x processing time). " + "Use 'xlsx' or 'csv' if only one format is needed." + ) + xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") + csv_subdir = os.path.join(output_dir, report_name) + os.makedirs(csv_subdir, exist_ok=True) + + # First call: Generate XLSX only + dfs_xlsx = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) + + # Check XLSX output + if os.path.exists(xlsx_path): + num_tabs = len(dfs_xlsx) if dfs_xlsx else 0 + log_rank_0( + f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" + ) + generated_files.append(xlsx_path) + + # Second call: Generate CSVs only + existing_csv_files = set(glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv"))) + _ = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) + + # Check CSV outputs (escape path to handle [] characters in filenames) + csv_files = glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv")) + new_csv_files = [f for f in csv_files if f not in existing_csv_files] + if new_csv_files: + log_rank_0(f"[TraceLens] Generated {len(new_csv_files)} CSV files for {report_name}") + generated_files.append(csv_subdir) # Upload directory to preserve structure + else: + warning_rank_0(f"[TraceLens] No new CSV files generated for {report_name}") + + elif output_format == "xlsx": + # XLSX only: Single file with multiple tabs + xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") + dfs_xlsx = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) + if os.path.exists(xlsx_path): + num_tabs = len(dfs_xlsx) if dfs_xlsx else 0 + log_rank_0( + f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" + ) + generated_files.append(xlsx_path) + + elif output_format == "csv": + # CSV only: Multiple files in a subdirectory per rank + csv_subdir = os.path.join(output_dir, report_name) + os.makedirs(csv_subdir, exist_ok=True) + existing_csv_files = set(glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv"))) + _ = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) + + # Collect all generated CSV files (escape path to handle [] characters in filenames) + csv_files = glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv")) + new_csv_files = [f for f in csv_files if f not in existing_csv_files] + if new_csv_files: + log_rank_0(f"[TraceLens] Generated {len(new_csv_files)} CSV files for {report_name}") + generated_files.append(csv_subdir) # Upload directory to preserve structure + else: + warning_rank_0(f"[TraceLens] No new CSV files generated for {report_name}") + + if generated_files: + return generated_files + + warning_rank_0(f"[TraceLens] No output files generated for: {trace_file}") + return [] + + except ImportError: + warning_rank_0( + "[TraceLens] TraceLens not available. Using simplified fallback CSV summary. " + "Install TraceLens for comprehensive kernel, memory, and communication analysis." + ) + # Fallback to simple CSV summary (basic stats only, may not handle all trace formats) + csv_path = _generate_trace_summary_csv(trace_file, output_dir, f"{report_name}_summary.csv") + return [csv_path] if csv_path else [] + + except Exception as e: + warning_rank_0( + f"[TraceLens] Error generating report: {e}. " + "Using simplified fallback CSV summary with basic statistics only." + ) + # Fallback to simple CSV summary (basic stats only, may not handle all trace formats) + csv_path = _generate_trace_summary_csv(trace_file, output_dir, f"{report_name}_summary.csv") + return [csv_path] if csv_path else [] + + +def _generate_trace_summary_csv( + trace_file: str, + output_dir: str, + report_name: str, +) -> Optional[str]: + """ + Generate a CSV summary from a PyTorch profiler trace file. + + This is a fallback when TraceLens is not available. + Extracts key metrics from the trace JSON and writes to CSV. + + Args: + trace_file: Path to the trace file + output_dir: Output directory + report_name: Name for the CSV file + + Returns: + Path to generated CSV or None if failed + """ + import csv + import gzip + import json + + try: + # Load trace file + if trace_file.endswith(".gz"): + with gzip.open(trace_file, "rt", encoding="utf-8") as f: + trace_data = json.load(f) + else: + with open(trace_file, "r", encoding="utf-8") as f: + trace_data = json.load(f) + + # Extract events from trace + events = trace_data.get("traceEvents", []) + if not events: + warning_rank_0(f"[TraceLens] No events found in trace: {trace_file}") + return None + + # Aggregate kernel/operation statistics + op_stats = {} + for event in events: + if event.get("cat") in ["kernel", "gpu_memcpy", "cuda_runtime", "cpu_op"]: + name = event.get("name", "unknown") + dur = event.get("dur", 0) # duration in microseconds + + if name not in op_stats: + op_stats[name] = {"count": 0, "total_us": 0, "min_us": float("inf"), "max_us": 0} + + op_stats[name]["count"] += 1 + op_stats[name]["total_us"] += dur + op_stats[name]["min_us"] = min(op_stats[name]["min_us"], dur) + op_stats[name]["max_us"] = max(op_stats[name]["max_us"], dur) + + # Filter out any operations with zero count (defensive; should not normally occur) + op_stats = {name: stats for name, stats in op_stats.items() if stats["count"] > 0} + if not op_stats: + warning_rank_0(f"[TraceLens] No kernel/op events found in trace: {trace_file}") + return None + + # Sort by total time descending + sorted_ops = sorted(op_stats.items(), key=lambda x: x[1]["total_us"], reverse=True) + + # Write CSV + output_path = os.path.join(output_dir, report_name) + with open(output_path, "w", newline="", encoding="utf-8") as csvfile: + writer = csv.writer(csvfile) + writer.writerow( + [ + "Operation", + "Count", + "Total Time (ms)", + "Avg Time (ms)", + "Min Time (ms)", + "Max Time (ms)", + "% of Total", + ] + ) + + total_time = sum(stats["total_us"] for _, stats in sorted_ops) + for name, stats in sorted_ops: + avg_us = stats["total_us"] / stats["count"] if stats["count"] > 0 else 0 + pct = (stats["total_us"] / total_time * 100) if total_time > 0 else 0 + writer.writerow( + [ + name, + stats["count"], + f"{stats['total_us'] / 1000:.3f}", + f"{avg_us / 1000:.3f}", + f"{stats['min_us'] / 1000:.3f}", + f"{stats['max_us'] / 1000:.3f}", + f"{pct:.2f}", + ] + ) + + log_rank_0(f"[TraceLens] Generated CSV summary: {report_name} ({len(sorted_ops)} operations)") + return output_path + + except json.JSONDecodeError as e: + warning_rank_0(f"[TraceLens] Failed to parse trace JSON: {e}") + return None + except Exception as e: + warning_rank_0(f"[TraceLens] Error generating CSV summary: {e}") + return None + + +def generate_tracelens_reports( + tensorboard_dir: str, + output_dir: str, + ranks: Optional[List[int]] = None, + output_format: str = "xlsx", + auto_install: bool = True, +) -> List[str]: + """ + Generate TraceLens analysis reports for trace files. + + Args: + tensorboard_dir: Directory containing PyTorch profiler trace files + output_dir: Directory to save the generated reports + ranks: List of ranks to generate reports for (None = all ranks) + To limit number of reports, specify fewer ranks in the list + output_format: Output format: + - "xlsx" (default): Multi-tab Excel; single parse, fastest + - "csv": Multiple CSV files per rank (kernels, memory, comm, etc.) + saved under {output_dir}/{report_name}/*.csv. + - "all": Both XLSX and CSV; trace parsed twice (~2x processing time). + XLSX: {output_dir}/{report_name}_analysis.xlsx + CSVs: {output_dir}/{report_name}/*.csv + auto_install: Whether to attempt auto-installing TraceLens if missing + + Returns: + List of paths to all generated report files + """ + # Normalize and validate ranks (config/CLI can pass as a string) + ranks = _normalize_tracelens_ranks(ranks) + if ranks == []: + warning_rank_0("[TraceLens] No valid ranks after validation; skipping report generation.") + return [] + + output_format = _normalize_tracelens_output_format(output_format) + + # Try to install tracelens, but continue with fallback if not available + _ensure_tracelens_installed(auto_install=auto_install) + + trace_files = _get_all_trace_files(tensorboard_dir) + if not trace_files: + log_rank_0("[TraceLens] No trace files found for analysis") + return [] + + # Filter by ranks if specified + if ranks is not None: + original_count = len(trace_files) + trace_files = _filter_traces_by_rank(trace_files, ranks) + log_rank_0(f"[TraceLens] Filtered to {len(trace_files)} trace files for ranks: {ranks}") + if not trace_files and original_count > 0: + warning_rank_0( + f"[TraceLens] Warning: No trace files match the specified ranks {ranks}. " + f"Found {original_count} trace files but none matched. " + "Check that the rank numbers are correct." + ) + + log_rank_0( + f"[TraceLens] Generating {output_format.upper()} reports for {len(trace_files)} trace files..." + ) + + generated_reports = [] + for trace_file in trace_files: + # generate_tracelens_report now returns a list of files + report_paths = generate_tracelens_report( + trace_file, output_dir, output_format=output_format, auto_install=auto_install + ) + generated_reports.extend(report_paths) + + log_rank_0( + f"[TraceLens] Generated {len(generated_reports)} report item(s) " f"from {len(trace_files)} traces" + ) + return generated_reports + + +def generate_tracelens_reports_locally( + tensorboard_dir: str, + exp_root_path: str, + ranks: Optional[List[int]] = None, + output_format: str = "xlsx", + auto_install: bool = True, +) -> int: + """ + Generate TraceLens analysis reports locally (without MLflow upload). + + This function generates TraceLens reports and saves them to + exp_root_path/tracelens_reports/ for local inspection. + + Args: + tensorboard_dir: Directory containing PyTorch profiler trace files + exp_root_path: Root path of the experiment (for saving reports) + ranks: List of ranks to analyze (None = all ranks, [0] = rank 0 only) + Specify fewer ranks to limit number of reports + output_format: Report format - "xlsx" (default), "csv", or "all" (xlsx+csv, ~2x time). + For "all": XLSX at {exp_root_path}/tracelens_reports/{report_name}_analysis.xlsx + and CSVs under {exp_root_path}/tracelens_reports/{report_name}/*.csv + auto_install: Whether to attempt auto-installing TraceLens if missing + + Returns: + Number of reports generated + + Example: + >>> generate_tracelens_reports_locally( + ... tensorboard_dir="/path/to/tensorboard", + ... exp_root_path="/path/to/experiment", + ... ranks=[0, 8], # Only 2 ranks = 2 reports + ... output_format="all" + ... ) + 26 # Generated 26 report files (XLSX + CSVs for 2 ranks) + """ + # Create output directory for reports + reports_dir = os.path.join(exp_root_path, "tracelens_reports") + os.makedirs(reports_dir, exist_ok=True) + + log_rank_0(f"[TraceLens] Generating reports from traces in: {tensorboard_dir}") + log_rank_0(f"[TraceLens] Reports will be saved to: {reports_dir}") + if ranks: + log_rank_0(f"[TraceLens] Analyzing ranks: {ranks}") + + # Generate reports + reports = generate_tracelens_reports( + tensorboard_dir=tensorboard_dir, + output_dir=reports_dir, + ranks=ranks, + output_format=output_format, + auto_install=auto_install, + ) + + if not reports: + log_rank_0("[TraceLens] No reports generated") + return 0 + + log_rank_0(f"[TraceLens] Generated {len(reports)} report files locally") + return len(reports) + + +def upload_tracelens_reports_to_mlflow( + mlflow_writer, + tensorboard_dir: str, + exp_root_path: str, + ranks: Optional[List[int]] = None, + output_format: str = "xlsx", + artifact_path: str = "trace_analysis", + cleanup_after_upload: bool = False, + auto_install: bool = True, +) -> int: + """ + Generate TraceLens reports and upload them to MLflow. + + This function: + 1. Finds PyTorch profiler trace files + 2. Generates TraceLens analysis reports for specified ranks + 3. Uploads the reports to MLflow under the trace_analysis artifact path + 4. Optionally cleans up local report files after successful upload + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + tensorboard_dir: Directory containing PyTorch profiler trace files + exp_root_path: Root path of the experiment (for saving reports) + ranks: List of ranks to analyze (None = all ranks, [0] = rank 0 only) + Specify fewer ranks to limit number of reports + output_format: Report format - "xlsx" (default), "csv", or "all" (xlsx+csv, ~2x time). + For "all": XLSX at {exp_root_path}/tracelens_reports/{report_name}_analysis.xlsx + and CSVs under {exp_root_path}/tracelens_reports/{report_name}/*.csv + artifact_path: MLflow artifact subdirectory for reports + cleanup_after_upload: If True, removes local reports after upload to save disk space. + If False, keeps reports locally for inspection. Default: False. + auto_install: Whether to attempt auto-installing TraceLens if missing + + Returns: + Number of reports uploaded to MLflow + + Note: + Reports are saved to exp_root_path/tracelens_reports/ and kept locally by default. + Set cleanup_after_upload=True to remove them after upload and save disk space. + """ + if mlflow_writer is None: + log_rank_0("[TraceLens] MLflow writer not available, skipping report upload") + return 0 + + # Normalize and validate ranks (config/CLI can pass as a string) + ranks = _normalize_tracelens_ranks(ranks) + if ranks == []: + warning_rank_0("[TraceLens] No valid ranks after validation; skipping report upload.") + return 0 + + output_format = _normalize_tracelens_output_format(output_format) + + # Create output directory for reports + reports_dir = os.path.join(exp_root_path, "tracelens_reports") + os.makedirs(reports_dir, exist_ok=True) + + log_rank_0(f"[TraceLens] Generating reports from traces in: {tensorboard_dir}") + log_rank_0(f"[TraceLens] Reports will be saved to: {reports_dir}") + if ranks: + log_rank_0(f"[TraceLens] Analyzing ranks: {ranks}") + + # Generate reports + reports = generate_tracelens_reports( + tensorboard_dir=tensorboard_dir, + output_dir=reports_dir, + ranks=ranks, + output_format=output_format, + auto_install=auto_install, + ) + + if not reports: + log_rank_0("[TraceLens] No reports generated, nothing to upload") + return 0 + + # Upload reports to MLflow (files via log_artifact, dirs via log_artifacts for correct behavior) + uploaded_count = 0 + for report_path in reports: + try: + if os.path.isdir(report_path): + subpath = ( + os.path.join(artifact_path, os.path.basename(report_path)) + if artifact_path + else os.path.basename(report_path) + ) + mlflow_writer.log_artifacts(report_path, artifact_path=subpath) + log_rank_0(f"[MLflow] Uploaded TraceLens report dir: {os.path.basename(report_path)}") + else: + mlflow_writer.log_artifact(report_path, artifact_path=artifact_path) + log_rank_0(f"[MLflow] Uploaded TraceLens report: {os.path.basename(report_path)}") + uploaded_count += 1 + except Exception as e: + warning_rank_0(f"[MLflow] Failed to upload report {report_path}: {e}") + + log_rank_0( + f"[TraceLens] Uploaded {uploaded_count} report item(s) to '{artifact_path}' " + "(each item may be a file or a directory of CSV files)" + ) + + # Optionally clean up local reports only when all uploads succeeded, to avoid losing data + # when some uploads failed (reported via warning_rank_0 above). + if cleanup_after_upload: + if uploaded_count == len(reports): + try: + import shutil + + shutil.rmtree(reports_dir) + log_rank_0(f"[TraceLens] Cleaned up local reports directory: {reports_dir}") + except Exception as e: + warning_rank_0(f"[TraceLens] Failed to cleanup reports directory: {e}") + else: + log_rank_0( + f"[TraceLens] Skipping cleanup (only {uploaded_count}/{len(reports)} uploads succeeded); " + f"keeping local reports at: {reports_dir}" + ) + else: + log_rank_0(f"[TraceLens] Keeping local reports at: {reports_dir}") + + return uploaded_count + + +# ============================================================================= +# Main Entry Point +# ============================================================================= + + +def upload_artifacts_to_mlflow( + mlflow_writer, + tensorboard_dir: Optional[str] = None, + exp_root_path: Optional[str] = None, + upload_traces: bool = True, + upload_logs: bool = True, + generate_tracelens_report: bool = False, + upload_tracelens_report: bool = False, + tracelens_ranks: Optional[List[int]] = None, + tracelens_output_format: str = "xlsx", + tracelens_cleanup_after_upload: bool = False, + tracelens_auto_install: bool = True, +) -> dict: + """ + Upload all artifacts (trace files, log files, TraceLens reports) to MLflow. + + This is the main entry point for uploading artifacts to MLflow. + It handles: + - Trace files from PyTorch profiler + - Log files from training + - TraceLens analysis reports (optional - generate locally and/or upload to MLflow) + + MLflow Artifact Structure: + artifacts/ + ├── traces/ # PyTorch profiler trace files + ├── logs/ # Training log files + └── trace_analysis/ # TraceLens analysis reports (if uploaded) + + TraceLens Report Generation Logic: + - If upload_tracelens_report=True: Generate AND upload (auto-enables generation) + - If generate_tracelens_report=True and upload_tracelens_report=False: Generate locally only + - If both False: No report generation + + Examples: + generate=False, upload=False → No reports + generate=True, upload=False → Generate locally only + generate=False, upload=True → Generate AND upload (auto-enabled) + generate=True, upload=True → Generate AND upload (explicit) + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + tensorboard_dir: Path to the tensorboard directory containing trace files + exp_root_path: Root path of the experiment for log files + upload_traces: Whether to upload trace files + upload_logs: Whether to upload log files + generate_tracelens_report: Whether to generate TraceLens reports locally + upload_tracelens_report: Whether to upload TraceLens reports to MLflow (implies generation) + tracelens_ranks: List of ranks to generate TraceLens reports for + (None = all ranks, [0, 8] = ranks 0 and 8 only) + Specify fewer ranks to limit number of reports + tracelens_output_format: Report format - "xlsx" (default), "csv", or "all" (xlsx+csv, ~2x time). + For "all": XLSX at {exp_root_path}/tracelens_reports/{report_name}_analysis.xlsx + and CSVs under {exp_root_path}/tracelens_reports/{report_name}/*.csv + tracelens_cleanup_after_upload: If True, removes local reports after upload to save disk space. + If False, keeps reports locally for inspection (default). + tracelens_auto_install: Whether to attempt auto-installing TraceLens if missing + + Returns: + Dictionary with counts of uploaded files: + { + "traces": , + "logs": , + "tracelens_reports": + } + """ + if mlflow_writer is None: + log_rank_0("[MLflow] MLflow writer not available, skipping artifact upload") + return {"traces": 0, "logs": 0, "tracelens_reports": 0} + + log_rank_0("[MLflow] Starting artifact upload to MLflow...") + log_rank_0(f"[MLflow] tensorboard_dir: {tensorboard_dir}") + log_rank_0(f"[MLflow] exp_root_path: {exp_root_path}") + log_rank_0(f"[MLflow] upload_traces: {upload_traces}, upload_logs: {upload_logs}") + log_rank_0( + f"[MLflow] generate_tracelens_report: {generate_tracelens_report}, " + f"upload_tracelens_report: {upload_tracelens_report}" + ) + + result = {"traces": 0, "logs": 0, "tracelens_reports": 0} + + # Upload trace files + if upload_traces and tensorboard_dir: + result["traces"] = upload_trace_files_to_mlflow( + mlflow_writer, tensorboard_dir, artifact_path="traces" + ) + + # Upload log files + if upload_logs and exp_root_path: + result["logs"] = upload_log_files_to_mlflow(mlflow_writer, exp_root_path, artifact_path="logs") + + # TraceLens report generation and upload logic + # If upload=True, auto-enable generation (even if generate=False) + should_generate = generate_tracelens_report or upload_tracelens_report + + if should_generate and tensorboard_dir and exp_root_path: + if upload_tracelens_report: + # Generate AND upload to MLflow + log_rank_0("[TraceLens] Mode: Generate and upload to MLflow") + result["tracelens_reports"] = upload_tracelens_reports_to_mlflow( + mlflow_writer=mlflow_writer, + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + ranks=tracelens_ranks, + output_format=tracelens_output_format, + artifact_path="trace_analysis", + cleanup_after_upload=tracelens_cleanup_after_upload, + auto_install=tracelens_auto_install, + ) + else: + # Generate locally only (no MLflow upload) + log_rank_0("[TraceLens] Mode: Generate locally only (no MLflow upload)") + num_generated = generate_tracelens_reports_locally( + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + ranks=tracelens_ranks, + output_format=tracelens_output_format, + auto_install=tracelens_auto_install, + ) + # Don't count as "uploaded" since they're local-only + log_rank_0(f"[TraceLens] Generated {num_generated} report files (not uploaded to MLflow)") + + log_rank_0( + f"[MLflow] Artifact upload complete: " + f"{result['traces']} traces, {result['logs']} logs, " + f"{result['tracelens_reports']} TraceLens reports" + ) + + return result diff --git a/primus/backends/megatron/training/mlflow_setup.py b/primus/backends/megatron/training/mlflow_setup.py new file mode 100644 index 000000000..3846594d1 --- /dev/null +++ b/primus/backends/megatron/training/mlflow_setup.py @@ -0,0 +1,111 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. +# +# See LICENSE for license information. +############################################################################### +""" +MLflow artifact upload utilities. + +This module provides functions for uploading artifacts (traces, logs, TraceLens +reports) to MLflow. Separated from global_vars.py to reduce merge conflicts. +""" + +from typing import List, Optional + +import torch.distributed as dist + +from .global_vars import get_mlflow_writer, get_primus_args +from .mlflow_artifacts import ( + generate_tracelens_reports_locally, + upload_artifacts_to_mlflow, +) + + +def upload_mlflow_artifacts( + tensorboard_dir: Optional[str] = None, + exp_root_path: Optional[str] = None, + upload_traces: bool = True, + upload_logs: bool = True, + generate_tracelens_report: bool = False, + upload_tracelens_report: bool = False, + tracelens_ranks: Optional[List[int]] = None, + tracelens_output_format: str = "xlsx", + tracelens_cleanup_after_upload: bool = False, + tracelens_auto_install: bool = True, +) -> Optional[dict]: + """ + Upload trace files, log files, and TraceLens reports to MLflow as artifacts. + + This function should be called at the end of training to upload all + artifacts to MLflow. It is safe to call on all ranks: non-writer ranks + will no-op when MLflow is disabled, while the writer rank performs uploads + (and local-only TraceLens generation may still occur when configured). + + MLflow Artifact Structure: + artifacts/ + ├── traces/ # PyTorch profiler trace files + ├── logs/ # Training log files + └── trace_analysis/ # TraceLens analysis reports (if uploaded) + + TraceLens Report Logic: + - upload_tracelens_report=True: Generate AND upload (auto-enables generation) + - generate_tracelens_report=True only: Generate locally without upload + - Both False: No report generation + + Args: + tensorboard_dir: Path to tensorboard directory with trace files + exp_root_path: Root experiment path for log files + upload_traces: Whether to upload trace files (default: True) + upload_logs: Whether to upload log files (default: True) + generate_tracelens_report: Whether to generate TraceLens reports locally + upload_tracelens_report: Whether to upload TraceLens reports to MLflow (implies generation) + tracelens_ranks: List of ranks to analyze with TraceLens + (None = all, [0, 8] = ranks 0 and 8 only) + Specify fewer ranks to limit number of reports + tracelens_output_format: Report format - "xlsx" (default), "csv", or "all" + tracelens_cleanup_after_upload: Remove local reports after upload (default: False) + tracelens_auto_install: Whether to attempt auto-installing TraceLens if missing + + Returns: + Dictionary with counts of uploaded files, or None if MLflow is not enabled + """ + mlflow_writer = get_mlflow_writer() + if mlflow_writer is None: + # Local-only TraceLens generation: run on a single rank only to avoid duplicate + # work and races writing exp_root_path/tracelens_reports (rank 0 when multi-rank). + # If MLflow is enabled in a distributed run, the writer rank will handle generation, + # so skip local generation on non-writer ranks. + try: + args = get_primus_args() + is_rank_zero = args.rank == 0 + mlflow_expected = getattr(args, "mlflow_run_name", None) is not None + is_distributed = args.world_size > 1 + except Exception: + is_rank_zero = not dist.is_initialized() or dist.get_rank() == 0 + mlflow_expected = False + is_distributed = dist.is_initialized() + + should_generate_locally = is_rank_zero and (not mlflow_expected or not is_distributed) + if should_generate_locally and generate_tracelens_report and tensorboard_dir and exp_root_path: + generate_tracelens_reports_locally( + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + ranks=tracelens_ranks, + output_format=tracelens_output_format, + auto_install=tracelens_auto_install, + ) + return None + + return upload_artifacts_to_mlflow( + mlflow_writer=mlflow_writer, + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + upload_traces=upload_traces, + upload_logs=upload_logs, + generate_tracelens_report=generate_tracelens_report, + upload_tracelens_report=upload_tracelens_report, + tracelens_ranks=tracelens_ranks, + tracelens_output_format=tracelens_output_format, + tracelens_cleanup_after_upload=tracelens_cleanup_after_upload, + tracelens_auto_install=tracelens_auto_install, + ) diff --git a/primus/configs/modules/megatron/primus_megatron_module.yaml b/primus/configs/modules/megatron/primus_megatron_module.yaml index 0ec3a22b0..7a1877d3e 100644 --- a/primus/configs/modules/megatron/primus_megatron_module.yaml +++ b/primus/configs/modules/megatron/primus_megatron_module.yaml @@ -5,6 +5,38 @@ disable_wandb: true disable_mlflow: true mlflow_run_name: null mlflow_experiment_name: null +# When disable_mlflow=false, set these to true to upload traces/logs to MLflow. +# Default false so MLflow remains opt-in and disable_mlflow is respected. +mlflow_upload_traces: false # Upload profiler trace files to MLflow +mlflow_upload_logs: false # Upload training log files to MLflow + +# TraceLens Report Generation & Upload +# ---------------------------------------------------------------------------- +# Supported options: generate_tracelens_report, mlflow_upload_tracelens_report, +# mlflow_tracelens_ranks, mlflow_tracelens_output_format, mlflow_tracelens_cleanup_after_upload +# +# generate_tracelens_report: Generate TraceLens analysis reports locally +# mlflow_upload_tracelens_report: Upload reports to MLflow (auto-enables generation) +# +# Usage patterns: +# generate=false, upload=false -> No reports generated +# generate=true, upload=false -> Generate reports locally only +# generate=false, upload=true -> Generate AND upload (auto-enabled) +# generate=true, upload=true -> Generate AND upload (explicit) +# +# To limit number of reports: use mlflow_tracelens_ranks (no separate max_reports option). +# Default: null = all ranks +# Example (8 GPUs/node): [0, 8] = one rank per node (assumes 8 GPUs/node) +# Use an explicit list for specific ranks (e.g., [0, 1, 2]) +# ---------------------------------------------------------------------------- +generate_tracelens_report: false # Generate TraceLens analysis reports locally (auto-enabled when upload=true) +mlflow_upload_tracelens_report: false # Upload TraceLens reports to MLflow (auto-enables generation, profiling, tensorboard) +mlflow_tracelens_ranks: null # List of ranks to analyze (null = all, e.g. [0, 1, 2]) +# TraceLens report format: xlsx (default; single parse, fastest), csv, or all (xlsx+csv; +# parses each trace twice so ~2x processing time; use only when both formats are needed) +mlflow_tracelens_output_format: xlsx +mlflow_tracelens_cleanup_after_upload: false # Keep local reports (true to cleanup and save disk space) +mlflow_tracelens_auto_install: true # Auto-install TraceLens if missing (set false to disable) disable_compile_dependencies: true # NOTE: # - If `use_rocm_mem_info = True`, ROCm memory information will be collected @@ -14,6 +46,26 @@ disable_compile_dependencies: true use_rocm_mem_info: false use_rocm_mem_info_iters: [1,2] +# MLflow performance metrics - comprehensive metrics for scaling tests +# When enabled, automatically enables throughput calculations and logs to MLflow: +# +# 1. Performance Metrics: +# - perf/throughput_tflops_per_gpu: TFLOP/s per GPU +# - perf/tps_tokens_per_sec_per_gpu: Tokens/sec per GPU +# - perf/iteration_time_ms: Time per training step (ms) +# +# 2. Memory Metrics: +# - perf/{rocm/hip}_current_mem_gb: Current GPU memory usage (GB) +# - perf/{rocm/hip}_mem_utilization_pct: Memory utilization (% of total) +# +# 3. System Metrics: +# - perf/gpu_utilization_pct_rank{N}: GPU utilization per rank (%) +# - perf/gpu_utilization_pct_avg: Average GPU utilization across all ranks (%) +# +# Note: This flag implicitly enables log_throughput behavior for metric collection. +# Note: GPU utilization collection uses all_gather on every log_interval (sync across ranks). +mlflow_upload_performance_metrics: false + # profiling disable_profiler_activity_cpu: false torch_profiler_record_shapes: true diff --git a/primus/core/utils/rocm_mem_info.py b/primus/core/utils/rocm_mem_info.py index 0e5f94dc9..5c0b30fad 100644 --- a/primus/core/utils/rocm_mem_info.py +++ b/primus/core/utils/rocm_mem_info.py @@ -4,9 +4,57 @@ # See LICENSE for license information. ############################################################################### +import re import subprocess +def get_rocm_smi_gpu_util(device_id: int): + """ + Return current GPU utilization (0-100) for the given device via rocm-smi --showuse. + + Returns: + float: GPU use percentage (0-100), or raises on failure (caller should catch and use fallback). + """ + try: + out = subprocess.check_output( + ["rocm-smi", "--showuse", f"-d={device_id}"], + text=True, + stderr=subprocess.DEVNULL, + timeout=10, + ) + except FileNotFoundError: + raise RuntimeError("rocm-smi not found, please ensure ROCm is installed and in PATH") + except subprocess.TimeoutExpired: + raise RuntimeError("rocm-smi --showuse timed out") + except subprocess.CalledProcessError as e: + output = e.output.strip() if isinstance(e.output, str) and e.output else "No output captured." + raise RuntimeError(f"rocm-smi --showuse failed with exit code {e.returncode}. Output: {output}") + + # Parse output: look for GPU use (%) or similar (e.g. "GPU use (%): 42" or "GPU Use: 42%") + for line in out.splitlines(): + line_lower = line.lower() + if "use" not in line_lower and "busy" not in line_lower: + continue + # Prefer a number that follows a use/busy label. + labeled_match = re.search( + r"\b(?:use|busy)\b[^0-9%]*[:=]\s*([0-9]+(?:\.[0-9]+)?)\s*%?", + line_lower, + ) + if labeled_match: + val = float(labeled_match.group(1)) + if 0 <= val <= 100: + return val + + # Otherwise, take the last percentage on the line to avoid grabbing GPU index. + percent_numbers = re.findall(r"(\d+(?:\.\d+)?)\s*%", line) + for n in reversed(percent_numbers): + val = float(n) + if 0 <= val <= 100: + return val + + raise RuntimeError(f"rocm-smi --showuse did not report a GPU use percentage for device {device_id}") + + def get_rocm_smi_mem_info(device_id: int): try: out = subprocess.check_output(["rocm-smi", "--showmeminfo", "vram", f"-d={device_id}"], text=True) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 5db59188b..7fc72d2d0 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -148,9 +148,10 @@ set_primus_global_variables, set_train_start_time, ) +from primus.backends.megatron.training.mlflow_setup import upload_mlflow_artifacts from primus.backends.megatron.training.tokenizer.tokenizer import build_tokenizer from primus.core.utils import checker, file_utils -from primus.core.utils.rocm_mem_info import get_rocm_smi_mem_info +from primus.core.utils.rocm_mem_info import get_rocm_smi_gpu_util, get_rocm_smi_mem_info from primus.core.utils.yaml_utils import nested_namespace_to_dict from primus.modules.base_module import BaseModule from primus.modules.module_utils import ( @@ -395,6 +396,29 @@ def update_primus_config( else: log_rank_0(f"-{latest_file} does not exist, skip auto_continue_train.") + # Auto-enable profiling and tensorboard when traces are needed: for MLflow upload + # (only if MLflow is enabled) or for local TraceLens report generation. + # Without this, generate_tracelens_report=True with profile=False would produce no traces. + needs_profiling = ( + ( + getattr(args, "mlflow_upload_traces", False) + or getattr(args, "mlflow_upload_tracelens_report", False) + ) + and not args.disable_mlflow + ) or getattr(args, "generate_tracelens_report", False) + if needs_profiling: + if not getattr(args, "profile", False): + args.profile = True + debug_rank_0("Auto-enabled profile=True for trace/tracelens (upload or local generation)") + if not getattr(args, "use_pytorch_profiler", False): + args.use_pytorch_profiler = True + debug_rank_0( + "Auto-enabled use_pytorch_profiler=True for trace/tracelens (upload or local generation)" + ) + if getattr(args, "disable_tensorboard", True): + args.disable_tensorboard = False + debug_rank_0("Auto-enabled tensorboard (disable_tensorboard=False) for profiler trace output") + # tensorboard if not args.disable_tensorboard: tb_path = os.path.abspath(os.path.join(exp_root_path, "tensorboard")) @@ -1120,8 +1144,26 @@ def run(self, *args, **kwargs): ft_integration.on_checkpointing_end(is_async_finalization=True) mlflow_writer = get_mlflow_writer() - if mlflow_writer: - mlflow_writer.end_run() + # Always call: uploads to MLflow when enabled; when MLflow disabled, still runs + # local-only TraceLens report generation if generate_tracelens_report=True. + try: + upload_mlflow_artifacts( + tensorboard_dir=args.tensorboard_dir, + exp_root_path=self.exp_root_path, + upload_traces=getattr(args, "mlflow_upload_traces", False), + upload_logs=getattr(args, "mlflow_upload_logs", False), + generate_tracelens_report=getattr(args, "generate_tracelens_report", False), + upload_tracelens_report=getattr(args, "mlflow_upload_tracelens_report", False), + tracelens_ranks=getattr(args, "mlflow_tracelens_ranks", None), + tracelens_output_format=getattr(args, "mlflow_tracelens_output_format", "xlsx"), + tracelens_cleanup_after_upload=getattr(args, "mlflow_tracelens_cleanup_after_upload", False), + tracelens_auto_install=getattr(args, "mlflow_tracelens_auto_install", True), + ) + except Exception as e: + warning_rank_0(f"[MLflow] Artifact upload failed: {e}") + finally: + if mlflow_writer: + mlflow_writer.end_run() one_logger and one_logger.log_metrics({"app_finish_time": one_logger_utils.get_timestamp_in_ms()}) @@ -1961,13 +2003,24 @@ def training_log( if iteration % args.log_interval == 0: # Note(wenx): If we want to collect rocm-smi memory information for the first two iterations, # place the collection before the timer to minimize its impact on latency measurements for iterations ≥ 3. - if args.log_throughput: + rocm_gpu_util = None + # Enable throughput calculations if log_throughput or mlflow_upload_performance_metrics is set + enable_perf_metrics = args.log_throughput or getattr( + args, "mlflow_upload_performance_metrics", False + ) + if enable_perf_metrics: if args.use_rocm_mem_info or ( args.use_rocm_mem_info_iters is not None and iteration in args.use_rocm_mem_info_iters ): rocm_total_mem, rocm_used_mem, rocm_free_mem = get_rocm_smi_mem_info( self.module_local_rank ) + # Collect GPU utilization for performance metrics + if getattr(args, "mlflow_upload_performance_metrics", False): + try: + rocm_gpu_util = get_rocm_smi_gpu_util(self.module_local_rank) + except Exception: + rocm_gpu_util = None elapsed_time = timers("interval-time").elapsed(barrier=True) elapsed_time_per_iteration = elapsed_time / total_iterations @@ -2004,7 +2057,7 @@ def training_log( elapsed_time_per_iteration * 1000.0, statistics.mean(self.recent_iteration_times), ) - if args.log_throughput: + if enable_perf_metrics: if ( iteration == self.log_avg_skip_iterations + 1 or len(self.recent_tflop_throughputs) >= self.log_avg_reset_interval @@ -2146,6 +2199,77 @@ def training_log( mlflow_writer.log_metric( f"{mem_collector}_mem_usage_percent", mem_usage * 100.0, iteration ) + + # Upload performance metrics to MLflow + # Groups: Performance (throughput, TPS, iteration time), Memory (peak, usage %), System (GPU util) + # NOTE: mlflow_writer only exists on last rank, but all_gather requires all ranks to participate + if getattr(args, "mlflow_upload_performance_metrics", False): + # Ensure memory metrics are available when log_timers_to_tensorboard is False + # (mem_collector, used_mem, mem_usage are otherwise only set inside log_timers_to_tensorboard) + if not args.log_timers_to_tensorboard: + hip_free_mem, hip_total_mem = torch.cuda.mem_get_info() + used_mem = hip_total_mem - hip_free_mem + mem_usage = used_mem / hip_total_mem + mem_collector = "hip" + if args.use_rocm_mem_info or ( + args.use_rocm_mem_info_iters is not None + and iteration in args.use_rocm_mem_info_iters + ): + mem_collector = "rocm" + used_mem = rocm_used_mem + mem_usage = rocm_mem_usage + # System metrics - GPU utilization per rank + # ALL ranks must participate in all_gather, even if they don't have mlflow_writer + # Use -1 as sentinel for unavailable GPU util + util_value = rocm_gpu_util if rocm_gpu_util is not None else -1.0 + util_tensor = torch.tensor([util_value], device="cuda", dtype=torch.float32) + world_size = dist.get_world_size() + gathered_utils = [torch.zeros_like(util_tensor) for _ in range(world_size)] + dist.all_gather(gathered_utils, util_tensor) + + # Only the last rank (which has mlflow_writer) logs the metrics + if mlflow_writer: + # Performance metrics + mlflow_writer.log_metric("perf/throughput_tflops_per_gpu", throughput, iteration) + mlflow_writer.log_metric( + "perf/tps_tokens_per_sec_per_gpu", token_throughput, iteration + ) + mlflow_writer.log_metric( + "perf/iteration_time_ms", + elapsed_time_per_iteration * 1000.0, + iteration, + ) + # Memory metrics + mlflow_writer.log_metric( + f"perf/{mem_collector}_current_mem_gb", + used_mem / 1024 / 1024 / 1024, + iteration, + ) + mlflow_writer.log_metric( + f"perf/{mem_collector}_mem_utilization_pct", + mem_usage * 100.0, + iteration, + ) + # Log GPU utilization from gathered values + valid_utils = [] + for rank, util_val in enumerate(gathered_utils): + util = util_val.item() + if util >= 0: # Filter out sentinel values (-1) + mlflow_writer.log_metric( + f"perf/gpu_utilization_pct_rank{rank}", + util, + iteration, + ) + valid_utils.append(util) + # Also log average GPU utilization (only from valid values) + if valid_utils: + avg_util = sum(valid_utils) / len(valid_utils) + mlflow_writer.log_metric( + "perf/gpu_utilization_pct_avg", + avg_util, + iteration, + ) + assert learning_rate is not None # Decoupled_learning_rate should be not None only on first and last pipeline stage. log_string += " learning rate: {:.6E} |".format(learning_rate) diff --git a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py new file mode 100644 index 000000000..b647bad68 --- /dev/null +++ b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py @@ -0,0 +1,660 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. +# +# See LICENSE for license information. +############################################################################### + +""" +Unit tests for primus.backends.megatron.training.mlflow_artifacts. + +Covers: +- Trace file discovery and filtering +- Rank extraction from filenames +- Report generation with mocked TraceLens API +- Upload logic with mocked mlflow_writer (file vs directory) +- Error handling and fallback behavior +- Cleanup logic validation +""" + +import os +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from primus.backends.megatron.training import mlflow_artifacts as mlflow_artifacts_mod + +# Use the module reference for patching in tests +_MODULE = "primus.backends.megatron.training.mlflow_artifacts" + + +@pytest.fixture(autouse=True) +def suppress_logging(): + """Suppress log_rank_0 and warning_rank_0 in all tests.""" + with patch(f"{_MODULE}.log_rank_0"), patch(f"{_MODULE}.warning_rank_0"): + yield + + +# ----------------------------------------------------------------------------- +# Trace file discovery +# ----------------------------------------------------------------------------- + + +class TestGetAllTraceFiles: + """Test _get_all_trace_files discovery and filtering.""" + + def test_returns_empty_for_none_path(self): + out = mlflow_artifacts_mod._get_all_trace_files(None) + assert out == [] + + def test_returns_empty_for_empty_string(self): + out = mlflow_artifacts_mod._get_all_trace_files("") + assert out == [] + + def test_returns_empty_for_missing_directory(self, tmp_path): + missing = tmp_path / "does_not_exist" + assert not missing.exists() + out = mlflow_artifacts_mod._get_all_trace_files(str(missing)) + assert out == [] + + def test_finds_pt_trace_json_in_root(self, tmp_path): + (tmp_path / "rank_0_step_1.pt.trace.json").write_text("{}") + (tmp_path / "rank_1_step_1.pt.trace.json").write_text("{}") + (tmp_path / "other.json").write_text("{}") + out = mlflow_artifacts_mod._get_all_trace_files(str(tmp_path)) + assert len(out) == 2 + basenames = {os.path.basename(p) for p in out} + assert basenames == {"rank_0_step_1.pt.trace.json", "rank_1_step_1.pt.trace.json"} + + def test_finds_pt_trace_json_gz(self, tmp_path): + (tmp_path / "rank_0.pt.trace.json.gz").write_text("") + out = mlflow_artifacts_mod._get_all_trace_files(str(tmp_path)) + assert len(out) == 1 + assert out[0].endswith("rank_0.pt.trace.json.gz") + + def test_finds_traces_recursively_in_subdirs(self, tmp_path): + sub = tmp_path / "sub" + sub.mkdir() + (sub / "rank_2.pt.trace.json").write_text("{}") + out = mlflow_artifacts_mod._get_all_trace_files(str(tmp_path)) + assert len(out) == 1 + assert "rank_2" in out[0] + + def test_deduplicates_results(self, tmp_path): + (tmp_path / "a.pt.trace.json").write_text("{}") + out = mlflow_artifacts_mod._get_all_trace_files(str(tmp_path)) + assert len(out) == 1 + + +# ----------------------------------------------------------------------------- +# Rank extraction from filenames +# ----------------------------------------------------------------------------- + + +class TestExtractRankFromFilename: + """Test _extract_rank_from_filename patterns.""" + + def test_rank_underscore_number_underscore(self): + assert mlflow_artifacts_mod._extract_rank_from_filename("rank_0_step_2.json.gz") == 0 + assert mlflow_artifacts_mod._extract_rank_from_filename("rank_15_step_1.pt.trace.json") == 15 + + def test_rank_underscore_number_dot(self): + """Match rank_N. (dot after rank), e.g. rank_0.pt.trace.json.gz used by PyTorch profiler.""" + assert mlflow_artifacts_mod._extract_rank_from_filename("rank_0.pt.trace.json") == 0 + assert mlflow_artifacts_mod._extract_rank_from_filename("rank_0.pt.trace.json.gz") == 0 + assert mlflow_artifacts_mod._extract_rank_from_filename("rank_8.pt.trace.json") == 8 + + def test_rank_bracket_number_bracket(self): + assert ( + mlflow_artifacts_mod._extract_rank_from_filename("primus-megatron-exp-rank[0].pt.trace.json") == 0 + ) + assert mlflow_artifacts_mod._extract_rank_from_filename("prefix-rank[7].json") == 7 + + def test_dash_rank_number_dot(self): + assert mlflow_artifacts_mod._extract_rank_from_filename("trace-rank1.json") == 1 + + def test_underscore_rank_number_dot(self): + assert mlflow_artifacts_mod._extract_rank_from_filename("trace_rank2.json") == 2 + + def test_returns_none_for_unknown_pattern(self): + assert mlflow_artifacts_mod._extract_rank_from_filename("random_file.json") is None + assert mlflow_artifacts_mod._extract_rank_from_filename("trace.json.gz") is None + + +# ----------------------------------------------------------------------------- +# Filter traces by rank +# ----------------------------------------------------------------------------- + + +class TestFilterTracesByRank: + """Test _filter_traces_by_rank.""" + + def test_returns_all_when_ranks_none(self, tmp_path): + paths = [ + str(tmp_path / "rank_0.pt.trace.json"), + str(tmp_path / "rank_1.pt.trace.json"), + ] + out = mlflow_artifacts_mod._filter_traces_by_rank(paths, None) + assert out == paths + + def test_returns_empty_when_ranks_empty_list(self, tmp_path): + paths = [str(tmp_path / "rank_0.pt.trace.json")] + out = mlflow_artifacts_mod._filter_traces_by_rank(paths, []) + assert out == [] + + def test_filters_to_specified_ranks(self, tmp_path): + paths = [ + str(tmp_path / "rank_0_step_1.pt.trace.json"), + str(tmp_path / "rank_1_step_1.pt.trace.json"), + str(tmp_path / "rank_2_step_1.pt.trace.json"), + ] + out = mlflow_artifacts_mod._filter_traces_by_rank(paths, [0, 2]) + assert len(out) == 2 + assert "rank_0" in out[0] + assert "rank_2" in out[1] + + +# ----------------------------------------------------------------------------- +# Normalize TraceLens inputs +# ----------------------------------------------------------------------------- + + +class TestNormalizeTracelensInputs: + """Test TraceLens input normalization helpers.""" + + def test_normalize_ranks_none(self): + assert mlflow_artifacts_mod._normalize_tracelens_ranks(None) is None + + def test_normalize_ranks_string_list(self): + ranks = mlflow_artifacts_mod._normalize_tracelens_ranks("[0, 2, '3']") + assert ranks == [0, 2, 3] + + def test_normalize_ranks_invalid_string(self): + assert mlflow_artifacts_mod._normalize_tracelens_ranks("not a list") is None + + def test_normalize_ranks_filters_invalid_and_world_size(self, monkeypatch): + monkeypatch.setenv("WORLD_SIZE", "2") + ranks = mlflow_artifacts_mod._normalize_tracelens_ranks([0, 1, 2, -1, "x", True]) + assert ranks == [0, 1] + + def test_normalize_output_format_none(self): + assert mlflow_artifacts_mod._normalize_tracelens_output_format(None) == "xlsx" + + def test_normalize_output_format_valid(self): + assert mlflow_artifacts_mod._normalize_tracelens_output_format("CSV") == "csv" + + def test_normalize_output_format_invalid(self): + assert mlflow_artifacts_mod._normalize_tracelens_output_format("pdf") == "xlsx" + + +# ----------------------------------------------------------------------------- +# Report generation with mocked TraceLens +# ----------------------------------------------------------------------------- + + +class TestGenerateTracelensReport: + """Test generate_tracelens_report with mocked TraceLens.""" + + def _install_fake_tracelens(self, mock_generate, xlsx_path=None, csv_dir=None): + """Put fake TraceLens.Reporting into sys.modules so generate_tracelens_report can import it.""" + reporting = types.ModuleType("TraceLens.Reporting") + reporting.generate_perf_report_pytorch = mock_generate + tracelens = types.ModuleType("TraceLens") + tracelens.Reporting = reporting + sys.modules["TraceLens"] = tracelens + sys.modules["TraceLens.Reporting"] = reporting + + def _side_effect(trace_file, output_xlsx_path=None, output_csvs_dir=None): + if output_xlsx_path and xlsx_path is not False: + Path(output_xlsx_path).parent.mkdir(parents=True, exist_ok=True) + Path(output_xlsx_path).write_text("xlsx") + return [{"tab1"}, {"tab2"}] + if output_csvs_dir and csv_dir is not False: + Path(output_csvs_dir).mkdir(parents=True, exist_ok=True) + (Path(output_csvs_dir) / "kernels.csv").write_text("kernels") + (Path(output_csvs_dir) / "memory.csv").write_text("memory") + return [] + + mock_generate.side_effect = _side_effect + return reporting + + def teardown_method(self): + for key in list(sys.modules.keys()): + if key == "TraceLens" or key.startswith("TraceLens."): + del sys.modules[key] + + def test_report_generation_xlsx_with_mocked_tracelens(self, tmp_path): + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.write_text('{"traceEvents": []}') + output_dir = tmp_path / "reports" + mock_gen = MagicMock() + + self._install_fake_tracelens(mock_gen, xlsx_path=True, csv_dir=None) + try: + with patch(f"{_MODULE}._ensure_openpyxl_installed"): + result = mlflow_artifacts_mod.generate_tracelens_report( + str(trace_file), str(output_dir), output_format="xlsx" + ) + assert len(result) == 1 + assert result[0].endswith("_analysis.xlsx") + assert mock_gen.called + finally: + self.teardown_method() + + def test_report_generation_missing_trace_file(self, tmp_path): + missing = tmp_path / "missing.pt.trace.json" + assert not missing.exists() + with patch(f"{_MODULE}.warning_rank_0") as warn: + result = mlflow_artifacts_mod.generate_tracelens_report( + str(missing), str(tmp_path), output_format="xlsx" + ) + assert result == [] + assert warn.called + + +# ----------------------------------------------------------------------------- +# Fallback CSV when TraceLens fails / not available +# ----------------------------------------------------------------------------- + + +class TestGenerateTraceSummaryCsvFallback: + """Test _generate_trace_summary_csv fallback.""" + + def test_fallback_csv_from_valid_trace_json(self, tmp_path): + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.write_text( + '{"traceEvents": [' + '{"name": "kernel1", "cat": "kernel", "dur": 100},' + '{"name": "kernel1", "cat": "kernel", "dur": 200}' + "]}" + ) + out_dir = tmp_path / "out" + out_dir.mkdir() + with patch(f"{_MODULE}.log_rank_0"), patch(f"{_MODULE}.warning_rank_0"): + path = mlflow_artifacts_mod._generate_trace_summary_csv( + str(trace_file), str(out_dir), "summary.csv" + ) + assert path is not None + assert path.endswith("summary.csv") + assert os.path.exists(path) + content = Path(path).read_text() + assert "kernel1" in content + assert "Count" in content or "Total" in content + + def test_fallback_returns_none_for_missing_file(self, tmp_path): + with patch(f"{_MODULE}.warning_rank_0"): + path = mlflow_artifacts_mod._generate_trace_summary_csv( + str(tmp_path / "missing.json"), str(tmp_path), "out.csv" + ) + assert path is None + + def test_fallback_returns_none_for_empty_events(self, tmp_path): + trace_file = tmp_path / "empty.pt.trace.json" + trace_file.write_text('{"traceEvents": []}') + with patch(f"{_MODULE}.warning_rank_0"): + path = mlflow_artifacts_mod._generate_trace_summary_csv(str(trace_file), str(tmp_path), "out.csv") + assert path is None + + +# ----------------------------------------------------------------------------- +# Upload logic with mocked mlflow_writer +# ----------------------------------------------------------------------------- + + +class TestUploadTraceFilesToMlflow: + """Test upload_trace_files_to_mlflow with mocked writer.""" + + def test_returns_zero_when_mlflow_writer_none(self, tmp_path): + count = mlflow_artifacts_mod.upload_trace_files_to_mlflow(None, str(tmp_path), artifact_path="traces") + assert count == 0 + + def test_uploads_found_traces_and_returns_count(self, tmp_path): + (tmp_path / "rank_0.pt.trace.json").write_text("{}") + (tmp_path / "rank_1.pt.trace.json").write_text("{}") + mock_writer = MagicMock() + with patch(f"{_MODULE}.log_rank_0"): + count = mlflow_artifacts_mod.upload_trace_files_to_mlflow( + mock_writer, str(tmp_path), artifact_path="traces" + ) + assert count == 2 + assert mock_writer.log_artifact.call_count == 2 + + +class TestUploadTracelensReportsToMlflow: + """Test upload_tracelens_reports_to_mlflow: file vs dir, cleanup, errors.""" + + def test_returns_zero_when_mlflow_writer_none(self, tmp_path): + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=[], + ): + count = mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + None, + str(tmp_path), + str(tmp_path), + ranks=[0], + output_format="xlsx", + ) + assert count == 0 + + def test_uses_log_artifact_for_files_and_log_artifacts_for_dirs(self, tmp_path): + file_report = tmp_path / "rank_0_analysis.xlsx" + file_report.write_text("xlsx") + dir_report = tmp_path / "rank_0" + dir_report.mkdir() + (dir_report / "kernels.csv").write_text("csv") + reports = [str(file_report), str(dir_report)] + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=reports, + ): + count = mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + mock_writer, + str(tmp_path), + str(tmp_path), + ranks=[0], + output_format="xlsx", + artifact_path="trace_analysis", + ) + assert count == 2 + mock_writer.log_artifact.assert_called_once() + mock_writer.log_artifacts.assert_called_once() + # Directory should be logged with subpath preserving name + call_kw = mock_writer.log_artifacts.call_args[1] + assert "artifact_path" in call_kw + assert "rank_0" in call_kw["artifact_path"] or call_kw["artifact_path"] == "rank_0" + + def test_upload_failure_on_one_report_still_uploads_others(self, tmp_path): + file1 = tmp_path / "r0.xlsx" + file1.write_text("a") + file2 = tmp_path / "r1.xlsx" + file2.write_text("b") + reports = [str(file1), str(file2)] + mock_writer = MagicMock() + mock_writer.log_artifact.side_effect = [None, Exception("upload failed")] + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=reports, + ), patch(f"{_MODULE}.warning_rank_0"): + count = mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + mock_writer, + str(tmp_path), + str(tmp_path), + artifact_path="trace_analysis", + ) + assert count == 1 + assert mock_writer.log_artifact.call_count == 2 + + def test_cleanup_after_upload_calls_rmtree(self, tmp_path): + reports_dir = tmp_path / "tracelens_reports" + reports_dir.mkdir() + file_report = reports_dir / "rank_0_analysis.xlsx" + file_report.write_text("xlsx") + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=[str(file_report)], + ), patch("shutil.rmtree") as mock_rmtree: + mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + mock_writer, + str(tmp_path), + str(tmp_path), + artifact_path="trace_analysis", + cleanup_after_upload=True, + ) + mock_rmtree.assert_called_once() + assert "tracelens_reports" in str(mock_rmtree.call_args[0][0]) + + def test_no_cleanup_when_cleanup_after_upload_false(self, tmp_path): + reports_dir = tmp_path / "tracelens_reports" + reports_dir.mkdir() + (reports_dir / "r0.xlsx").write_text("x") + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=[str(reports_dir / "r0.xlsx")], + ), patch("shutil.rmtree") as mock_rmtree: + mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + mock_writer, + str(tmp_path), + str(tmp_path), + cleanup_after_upload=False, + ) + mock_rmtree.assert_not_called() + + def test_cleanup_skipped_when_some_uploads_failed(self, tmp_path): + reports_dir = tmp_path / "tracelens_reports" + reports_dir.mkdir() + f1 = reports_dir / "r0.xlsx" + f2 = reports_dir / "r1.xlsx" + f1.write_text("a") + f2.write_text("b") + mock_writer = MagicMock() + mock_writer.log_artifact.side_effect = [None, Exception("upload failed")] + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports", + return_value=[str(f1), str(f2)], + ), patch("shutil.rmtree") as mock_rmtree, patch(f"{_MODULE}.warning_rank_0"): + mlflow_artifacts_mod.upload_tracelens_reports_to_mlflow( + mock_writer, + str(tmp_path), + str(tmp_path), + artifact_path="trace_analysis", + cleanup_after_upload=True, + ) + mock_rmtree.assert_not_called() + + +# ----------------------------------------------------------------------------- +# upload_artifacts_to_mlflow (main entry point) +# ----------------------------------------------------------------------------- + + +class TestUploadArtifactsToMlflow: + """Test upload_artifacts_to_mlflow: trace/log discovery, artifact paths, TraceLens logic, cleanup.""" + + def test_returns_zero_dict_when_mlflow_writer_none(self, tmp_path): + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + None, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + ) + assert result == {"traces": 0, "logs": 0, "tracelens_reports": 0} + + def test_upload_traces_called_with_correct_artifact_path(self, tmp_path): + (tmp_path / "rank_0.pt.trace.json").write_text("{}") + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "upload_trace_files_to_mlflow", + return_value=2, + ) as mock_traces: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + upload_traces=True, + upload_logs=False, + generate_tracelens_report=False, + upload_tracelens_report=False, + ) + assert result["traces"] == 2 + mock_traces.assert_called_once() + call_kw = mock_traces.call_args[1] + assert call_kw["artifact_path"] == "traces" + + def test_upload_logs_called_with_correct_artifact_path(self, tmp_path): + (tmp_path / "logs" / "master" / "master-0.log").parent.mkdir(parents=True) + (tmp_path / "logs" / "master" / "master-0.log").write_text("log") + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "upload_log_files_to_mlflow", + return_value=1, + ) as mock_logs: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=None, + exp_root_path=str(tmp_path), + upload_traces=False, + upload_logs=True, + generate_tracelens_report=False, + upload_tracelens_report=False, + ) + assert result["logs"] == 1 + mock_logs.assert_called_once() + call_kw = mock_logs.call_args[1] + assert call_kw["artifact_path"] == "logs" + + def test_tracelens_upload_called_with_artifact_path_and_cleanup(self, tmp_path): + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "upload_tracelens_reports_to_mlflow", + return_value=3, + ) as mock_upload_tracelens: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + upload_traces=False, + upload_logs=False, + generate_tracelens_report=False, + upload_tracelens_report=True, + tracelens_ranks=[0, 8], + tracelens_output_format="xlsx", + tracelens_cleanup_after_upload=True, + ) + assert result["tracelens_reports"] == 3 + mock_upload_tracelens.assert_called_once() + call_kw = mock_upload_tracelens.call_args[1] + assert call_kw["artifact_path"] == "trace_analysis" + assert call_kw["cleanup_after_upload"] is True + assert call_kw["ranks"] == [0, 8] + assert call_kw["output_format"] == "xlsx" + + def test_tracelens_generate_locally_only_when_generate_true_upload_false(self, tmp_path): + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports_locally", + return_value=5, + ) as mock_local: + with patch.object( + mlflow_artifacts_mod, + "upload_tracelens_reports_to_mlflow", + ) as mock_upload: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + upload_traces=False, + upload_logs=False, + generate_tracelens_report=True, + upload_tracelens_report=False, + ) + assert result["tracelens_reports"] == 0 + mock_local.assert_called_once() + mock_upload.assert_not_called() + + def test_no_tracelens_calls_when_both_generate_and_upload_false(self, tmp_path): + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "generate_tracelens_reports_locally", + ) as mock_local: + with patch.object( + mlflow_artifacts_mod, + "upload_tracelens_reports_to_mlflow", + ) as mock_upload: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + upload_traces=False, + upload_logs=False, + generate_tracelens_report=False, + upload_tracelens_report=False, + ) + assert result["tracelens_reports"] == 0 + mock_local.assert_not_called() + mock_upload.assert_not_called() + + def test_trace_and_log_discovery_integration(self, tmp_path): + """Trace and log files are discovered and upload helpers called with correct paths.""" + (tmp_path / "rank_0.pt.trace.json").write_text("{}") + (tmp_path / "logs" / "master" / "m.log").parent.mkdir(parents=True) + (tmp_path / "logs" / "master" / "m.log").write_text("log") + mock_writer = MagicMock() + with patch.object( + mlflow_artifacts_mod, + "upload_trace_files_to_mlflow", + return_value=1, + ) as mock_traces: + with patch.object( + mlflow_artifacts_mod, + "upload_log_files_to_mlflow", + return_value=1, + ) as mock_logs: + result = mlflow_artifacts_mod.upload_artifacts_to_mlflow( + mock_writer, + tensorboard_dir=str(tmp_path), + exp_root_path=str(tmp_path), + upload_traces=True, + upload_logs=True, + generate_tracelens_report=False, + upload_tracelens_report=False, + ) + assert result["traces"] == 1 + assert result["logs"] == 1 + assert result["tracelens_reports"] == 0 + mock_traces.assert_called_once_with(mock_writer, str(tmp_path), artifact_path="traces") + mock_logs.assert_called_once_with(mock_writer, str(tmp_path), artifact_path="logs") + + +# ----------------------------------------------------------------------------- +# Log file discovery +# ----------------------------------------------------------------------------- + + +class TestGetAllLogFiles: + """Test _get_all_log_files.""" + + def test_returns_empty_for_empty_exp_root(self): + out = mlflow_artifacts_mod._get_all_log_files("") + assert out == [] + + def test_returns_empty_when_logs_dir_missing(self, tmp_path): + out = mlflow_artifacts_mod._get_all_log_files(str(tmp_path)) + assert out == [] + + def test_finds_log_files_recursively(self, tmp_path): + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + (logs_dir / "master" / "master-0.log").parent.mkdir(parents=True) + (logs_dir / "master" / "master-0.log").write_text("log") + (logs_dir / "train" / "rank-0" / "rank-0.log").parent.mkdir(parents=True) + (logs_dir / "train" / "rank-0" / "rank-0.log").write_text("log") + out = mlflow_artifacts_mod._get_all_log_files(str(tmp_path)) + assert len(out) == 2 + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + + +def test_tracelens_install_ref_constant(): + """TRACELENS_INSTALL_REF is set for reproducibility.""" + assert hasattr(mlflow_artifacts_mod, "TRACELENS_INSTALL_REF") + assert isinstance(mlflow_artifacts_mod.TRACELENS_INSTALL_REF, str) + assert len(mlflow_artifacts_mod.TRACELENS_INSTALL_REF) > 0 diff --git a/tests/unit_tests/backends/megatron/test_mlflow_setup.py b/tests/unit_tests/backends/megatron/test_mlflow_setup.py new file mode 100644 index 000000000..70738d9a4 --- /dev/null +++ b/tests/unit_tests/backends/megatron/test_mlflow_setup.py @@ -0,0 +1,51 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. +# +# See LICENSE for license information. +############################################################################### + +from unittest.mock import MagicMock, patch + +from primus.backends.megatron.training import mlflow_setup + + +class DummyArgs: + def __init__(self, rank: int, world_size: int, mlflow_run_name=None): + self.rank = rank + self.world_size = world_size + self.mlflow_run_name = mlflow_run_name + + +def _call_upload(generate_tracelens_report: bool, args: DummyArgs, mock_generate): + with patch(f"{mlflow_setup.__name__}.get_mlflow_writer", return_value=None), patch( + f"{mlflow_setup.__name__}.get_primus_args", return_value=args + ), patch(f"{mlflow_setup.__name__}.generate_tracelens_reports_locally", mock_generate): + mlflow_setup.upload_mlflow_artifacts( + tensorboard_dir="/tmp/tb", + exp_root_path="/tmp/exp", + generate_tracelens_report=generate_tracelens_report, + ) + + +def test_local_generation_single_rank_no_mlflow(): + mock_generate = MagicMock() + _call_upload(True, DummyArgs(rank=0, world_size=1, mlflow_run_name=None), mock_generate) + mock_generate.assert_called_once() + + +def test_local_generation_skipped_when_mlflow_expected_distributed(): + mock_generate = MagicMock() + _call_upload(True, DummyArgs(rank=0, world_size=8, mlflow_run_name="run"), mock_generate) + mock_generate.assert_not_called() + + +def test_local_generation_distributed_without_mlflow(): + mock_generate = MagicMock() + _call_upload(True, DummyArgs(rank=0, world_size=8, mlflow_run_name=None), mock_generate) + mock_generate.assert_called_once() + + +def test_local_generation_disabled_when_flag_false(): + mock_generate = MagicMock() + _call_upload(False, DummyArgs(rank=0, world_size=1, mlflow_run_name=None), mock_generate) + mock_generate.assert_not_called()