From 7dbd2421d91ac40b987149f5fea5aa1a23d9b177 Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 18 Dec 2025 08:52:47 +0000 Subject: [PATCH 01/24] feat: Add MLflow artifact upload for traces and logs - Add mlflow_artifacts.py with functions to collect and upload trace/log files - Add upload_mlflow_artifacts() wrapper in global_vars.py - Integrate artifact upload in trainer.py before MLflow run ends - Add mlflow_upload_traces and mlflow_upload_logs config options - Add unique timestamp-based output directories for multi-node consistency - Pass MLflow environment variables through Docker container --- examples/run_local_pretrain.sh | 11 + examples/run_pretrain.sh | 21 +- examples/run_slurm_pretrain.sh | 19 +- .../backends/megatron/training/global_vars.py | 55 ++++ .../megatron/training/mlflow_artifacts.py | 246 ++++++++++++++++++ .../megatron/primus_megatron_module.yaml | 2 + primus/modules/trainer/megatron/trainer.py | 14 + 7 files changed, 365 insertions(+), 3 deletions(-) create mode 100644 primus/backends/megatron/training/mlflow_artifacts.py diff --git a/examples/run_local_pretrain.sh b/examples/run_local_pretrain.sh index 3e4ea341b..48f612779 100755 --- a/examples/run_local_pretrain.sh +++ b/examples/run_local_pretrain.sh @@ -93,6 +93,11 @@ ENV_ARGS+=("--env" "HF_TOKEN") ENV_ARGS+=("--env" "WANDB_API_KEY") ENV_ARGS+=("--env" "ENABLE_NUMA_BINDING") ENV_ARGS+=("--env" "HSA_KERNARG_POOL_SIZE") +# MLflow environment variables +ENV_ARGS+=("--env" "DATABRICKS_TOKEN") +ENV_ARGS+=("--env" "DATABRICKS_HOST") +ENV_ARGS+=("--env" "MLFLOW_TRACKING_URI") +ENV_ARGS+=("--env" "MLFLOW_REGISTRY_URI") echo "ENV_ARGS: ${ENV_ARGS[*]}" HOSTNAME=$(hostname) @@ -158,6 +163,12 @@ docker_podman_proxy run --rm \ --env GPUS_PER_NODE \ --env DATA_PATH \ --env TRAIN_LOG \ + --env PRIMUS_WORKSPACE \ + --env PRIMUS_EXP_NAME \ + --env TIMESTAMP \ + --env LOG_DIR \ + --env PRIMUS_TEAM \ + --env PRIMUS_USER \ --env HSA_NO_SCRATCH_RECLAIM \ --env NVTE_CK_USES_BWD_V3 \ --env GPU_MAX_HW_QUEUES \ diff --git a/examples/run_pretrain.sh b/examples/run_pretrain.sh index 9053b43ab..a936df288 100755 --- a/examples/run_pretrain.sh +++ b/examples/run_pretrain.sh @@ -123,11 +123,28 @@ fi # export AITER_JIT_DIR="${TMP_BUILD_DIR}/${CACHE_TAG}_aiter_cache" -TRAIN_LOG=${TRAIN_LOG:-"output/log_mp_pretrain_$(basename "$EXP" .yaml).txt"} +# Extract model name from EXP config file path (e.g., deepseek_v2_lite-pretrain.yaml -> deepseek_v2_lite-pretrain) +MODEL_NAME=$(basename "${EXP}" .yaml) + +# Only generate new timestamp/paths if not already set by run_slurm_pretrain.sh +# This ensures: 1) single-node gets fresh timestamp, 2) multi-node shares same directory +if [ -z "${PRIMUS_EXP_NAME}" ]; then + TIMESTAMP=$(date +%Y%m%d_%H%M%S) + export PRIMUS_WORKSPACE=${PRIMUS_WORKSPACE:-"./output"} + export PRIMUS_EXP_NAME="${MODEL_NAME}_${TIMESTAMP}" + export LOG_DIR="${PRIMUS_WORKSPACE}/${PRIMUS_EXP_NAME}" +fi +# Clear work_group and user_name to simplify path: workspace/exp_name +export PRIMUS_TEAM="" +export PRIMUS_USER="" + +mkdir -p "$LOG_DIR" +TRAIN_LOG="${LOG_DIR}/log_mp_pretrain.txt" LOG_INFO_RANK0 "==========Training info==========" LOG_INFO_RANK0 "EXP: $EXP" -LOG_INFO_RANK0 "EXP: $BACKEND" +LOG_INFO_RANK0 "BACKEND: $BACKEND" +LOG_INFO_RANK0 "OUTPUT_DIR: ${LOG_DIR}" LOG_INFO_RANK0 "TRAIN_LOG: $TRAIN_LOG" LOG_INFO_RANK0 "PRIMUS_PATH: $PRIMUS_PATH" LOG_INFO_RANK0 "DATA_PATH: $DATA_PATH" diff --git a/examples/run_slurm_pretrain.sh b/examples/run_slurm_pretrain.sh index 04da35a4d..7e6523239 100755 --- a/examples/run_slurm_pretrain.sh +++ b/examples/run_slurm_pretrain.sh @@ -34,7 +34,22 @@ export NNODES=${NNODES:-1} SCRIPT_DIR=$(dirname "$(realpath "${BASH_SOURCE[0]}")") -export LOG_DIR=${LOG_DIR:-"./output"} +# -------------------- Unique Output Directory Per Run -------------------- +# Extract model name from EXP config file path (e.g., deepseek_v2_lite-pretrain.yaml -> deepseek_v2_lite-pretrain) +MODEL_NAME=$(basename "${EXP:-unknown}" .yaml) +# Export TIMESTAMP so all nodes use the same value (prevents multi-node race condition) +TIMESTAMP=$(date +%Y%m%d_%H%M%S) +export TIMESTAMP + +# Set PRIMUS environment variables for output paths +BASE_LOG_DIR=${LOG_DIR:-"./output"} +export PRIMUS_WORKSPACE="${BASE_LOG_DIR}" +export PRIMUS_EXP_NAME="${MODEL_NAME}_${TIMESTAMP}" +export LOG_DIR="${PRIMUS_WORKSPACE}/${PRIMUS_EXP_NAME}" +# Clear work_group and user_name to simplify path: workspace/exp_name +export PRIMUS_TEAM="" +export PRIMUS_USER="" + LOG_FILE="${LOG_DIR}/log_slurm_pretrain.txt" mkdir -p "$LOG_DIR" @@ -52,6 +67,8 @@ srun -N "${NNODES}" \ echo \"SLURM_GPUS_ON_NODE: \${SLURM_GPUS_ON_NODE}\" echo \"\" fi + # Log TIMESTAMP on each node to verify consistency across nodes + echo \"[Node \$SLURM_NODEID] TIMESTAMP=\${TIMESTAMP}\" export MASTER_ADDR=\${node_array[0]} export MASTER_PORT=\${MASTER_PORT} export NNODES=\${SLURM_NNODES} diff --git a/primus/backends/megatron/training/global_vars.py b/primus/backends/megatron/training/global_vars.py index b23016d46..11c34d461 100644 --- a/primus/backends/megatron/training/global_vars.py +++ b/primus/backends/megatron/training/global_vars.py @@ -8,8 +8,11 @@ from primus.modules.module_utils import debug_rank_0 +from .mlflow_artifacts import upload_artifacts_to_mlflow + _GLOBAL_ARGS = None _GLOBAL_MLFLOW_WRITER = None +_GLOBAL_EXP_ROOT_PATH = None def set_args(args): @@ -23,6 +26,17 @@ def get_args(): return _GLOBAL_ARGS +def set_exp_root_path(exp_root_path): + """Set the experiment root path for artifact logging.""" + global _GLOBAL_EXP_ROOT_PATH + _GLOBAL_EXP_ROOT_PATH = exp_root_path + + +def get_exp_root_path(): + """Return experiment root path. Can be None.""" + return _GLOBAL_EXP_ROOT_PATH + + def get_mlflow_writer(): """Return mlflow writer. It can be None so no need to check if it is initialized.""" @@ -62,14 +76,51 @@ def _set_mlflow_writer(args): _GLOBAL_MLFLOW_WRITER = mlflow +def upload_mlflow_artifacts( + upload_traces: bool = True, + upload_logs: bool = True, +): + """ + Upload trace files and log files to MLflow as artifacts. + + This should be called before ending the MLflow run to ensure all + artifacts are uploaded. Only the rank that initialized MLflow + (typically rank world_size - 1) should call this. + + Args: + upload_traces: Whether to upload profiler trace files + upload_logs: Whether to upload training log files + + Returns: + Dictionary with counts of uploaded files, or None if MLflow is not enabled + """ + mlflow_writer = get_mlflow_writer() + if mlflow_writer is None: + return None + + args = get_args() + exp_root_path = get_exp_root_path() + tensorboard_dir = getattr(args, "tensorboard_dir", None) + + return upload_artifacts_to_mlflow( + mlflow_writer=mlflow_writer, + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + upload_traces=upload_traces, + upload_logs=upload_logs, + ) + + def unset_global_variables(): """Unset global vars.""" global _GLOBAL_ARGS global _GLOBAL_MLFLOW_WRITER + global _GLOBAL_EXP_ROOT_PATH _GLOBAL_ARGS = None _GLOBAL_MLFLOW_WRITER = None + _GLOBAL_EXP_ROOT_PATH = None def _ensure_var_is_initialized(var, name): @@ -84,4 +135,8 @@ def _ensure_var_is_not_initialized(var, name): def destroy_global_vars(): global _GLOBAL_ARGS + global _GLOBAL_MLFLOW_WRITER + global _GLOBAL_EXP_ROOT_PATH _GLOBAL_ARGS = None + _GLOBAL_MLFLOW_WRITER = None + _GLOBAL_EXP_ROOT_PATH = None diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py new file mode 100644 index 000000000..67caa0e62 --- /dev/null +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -0,0 +1,246 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. +# +# See LICENSE for license information. +############################################################################### + +""" +MLflow Artifact Logging Utilities + +This module provides functions to upload trace files and log files to MLflow +when MLflow tracking is enabled. + +Features: +- Upload profiler trace files from all profiled ranks (including multi-node) +- Upload log files from all levels and all ranks +- Supports both local and distributed training scenarios +""" + +import glob +import os +from typing import Optional + +from primus.modules.module_utils import log_rank_0, warning_rank_0 + + +def _get_all_trace_files(tensorboard_dir: str) -> list: + """ + Find all profiler trace files in the tensorboard directory. + + Trace files are typically named like: + - *.pt.trace.json + - *.pt.trace.json.gz + + Args: + tensorboard_dir: Path to the tensorboard directory containing trace files + + Returns: + List of paths to trace files + """ + if not tensorboard_dir or not os.path.exists(tensorboard_dir): + return [] + + trace_files = [] + # Look for PyTorch profiler trace files (both compressed and uncompressed) + patterns = ["*.pt.trace.json", "*.pt.trace.json.gz"] + for pattern in patterns: + trace_files.extend(glob.glob(os.path.join(tensorboard_dir, pattern))) + trace_files.extend(glob.glob(os.path.join(tensorboard_dir, "**", pattern), recursive=True)) + + # Remove duplicates while preserving order + seen = set() + unique_files = [] + for f in trace_files: + if f not in seen: + seen.add(f) + unique_files.append(f) + + return unique_files + + +def _get_all_log_files(exp_root_path: str) -> list: + """ + Find all log files in the experiment logs directory. + + Log files are organized as: + - {exp_root_path}/logs/master/master-*.log + - {exp_root_path}/logs/{module_name}/rank-{rank}/*.log + + Args: + exp_root_path: Root path of the experiment + + Returns: + List of paths to log files + """ + if not exp_root_path: + return [] + + logs_dir = os.path.join(exp_root_path, "logs") + if not os.path.exists(logs_dir): + return [] + + log_files = [] + # Find all .log files recursively + log_files.extend(glob.glob(os.path.join(logs_dir, "**", "*.log"), recursive=True)) + + return log_files + + +def upload_trace_files_to_mlflow( + mlflow_writer, + tensorboard_dir: str, + artifact_path: str = "traces", +) -> int: + """ + Upload all profiler trace files to MLflow as artifacts. + + This function collects trace files from the tensorboard directory and + uploads them to MLflow. In distributed settings, only rank 0 (or the + last rank where MLflow writer is initialized) should call this. + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + tensorboard_dir: Path to the tensorboard directory containing trace files + artifact_path: MLflow artifact subdirectory for trace files + + Returns: + Number of trace files uploaded + """ + if mlflow_writer is None: + return 0 + + log_rank_0(f"[MLflow] Searching for trace files in: {tensorboard_dir}") + trace_files = _get_all_trace_files(tensorboard_dir) + if len(trace_files) > 5: + log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files[:5]}...") + else: + log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files}") + + if not trace_files: + log_rank_0("[MLflow] No trace files found to upload") + return 0 + + uploaded_count = 0 + for trace_file in trace_files: + try: + # Get relative path from tensorboard_dir for artifact organization + rel_path = os.path.relpath(trace_file, tensorboard_dir) + # Determine artifact subdirectory based on file location + artifact_subpath = ( + os.path.join(artifact_path, os.path.dirname(rel_path)) + if os.path.dirname(rel_path) + else artifact_path + ) + + mlflow_writer.log_artifact(trace_file, artifact_path=artifact_subpath) + uploaded_count += 1 + log_rank_0(f"[MLflow] Uploaded trace file: {os.path.basename(trace_file)}") + except Exception as e: + warning_rank_0(f"[MLflow] Failed to upload trace file {trace_file}: {e}") + + log_rank_0(f"[MLflow] Uploaded {uploaded_count} trace files to '{artifact_path}'") + return uploaded_count + + +def upload_log_files_to_mlflow( + mlflow_writer, + exp_root_path: str, + artifact_path: str = "logs", +) -> int: + """ + Upload all log files to MLflow as artifacts. + + This function collects log files from all ranks and all log levels + and uploads them to MLflow. The directory structure is preserved + in the artifact path. + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + exp_root_path: Root path of the experiment + artifact_path: MLflow artifact subdirectory for log files + + Returns: + Number of log files uploaded + """ + if mlflow_writer is None: + return 0 + + log_files = _get_all_log_files(exp_root_path) + + if not log_files: + log_rank_0("[MLflow] No log files found to upload") + return 0 + + logs_base_dir = os.path.join(exp_root_path, "logs") + uploaded_count = 0 + + for log_file in log_files: + try: + # Preserve directory structure relative to logs base directory + rel_path = os.path.relpath(log_file, logs_base_dir) + artifact_subpath = ( + os.path.join(artifact_path, os.path.dirname(rel_path)) + if os.path.dirname(rel_path) + else artifact_path + ) + + mlflow_writer.log_artifact(log_file, artifact_path=artifact_subpath) + uploaded_count += 1 + except Exception as e: + warning_rank_0(f"[MLflow] Failed to upload log file {log_file}: {e}") + + log_rank_0(f"[MLflow] Uploaded {uploaded_count} log files to '{artifact_path}'") + return uploaded_count + + +def upload_artifacts_to_mlflow( + mlflow_writer, + tensorboard_dir: Optional[str] = None, + exp_root_path: Optional[str] = None, + upload_traces: bool = True, + upload_logs: bool = True, +) -> dict: + """ + Upload all artifacts (trace files and log files) to MLflow. + + This is the main entry point for uploading artifacts to MLflow. + It handles both trace files from profiling and log files from training. + + Args: + mlflow_writer: The MLflow module instance (from get_mlflow_writer()) + tensorboard_dir: Path to the tensorboard directory containing trace files + exp_root_path: Root path of the experiment for log files + upload_traces: Whether to upload trace files + upload_logs: Whether to upload log files + + Returns: + Dictionary with counts of uploaded files: + { + "traces": , + "logs": + } + """ + if mlflow_writer is None: + log_rank_0("[MLflow] MLflow writer not available, skipping artifact upload") + return {"traces": 0, "logs": 0} + + log_rank_0("[MLflow] Starting artifact upload to MLflow...") + log_rank_0(f"[MLflow] tensorboard_dir: {tensorboard_dir}") + log_rank_0(f"[MLflow] exp_root_path: {exp_root_path}") + log_rank_0(f"[MLflow] upload_traces: {upload_traces}, upload_logs: {upload_logs}") + + result = {"traces": 0, "logs": 0} + + if upload_traces and tensorboard_dir: + result["traces"] = upload_trace_files_to_mlflow( + mlflow_writer, tensorboard_dir, artifact_path="traces" + ) + + if upload_logs and exp_root_path: + result["logs"] = upload_log_files_to_mlflow(mlflow_writer, exp_root_path, artifact_path="logs") + + log_rank_0( + f"[MLflow] Artifact upload complete: " f"{result['traces']} trace files, {result['logs']} log files" + ) + + return result diff --git a/primus/configs/modules/megatron/primus_megatron_module.yaml b/primus/configs/modules/megatron/primus_megatron_module.yaml index 0ec3a22b0..6d8e4a6bf 100644 --- a/primus/configs/modules/megatron/primus_megatron_module.yaml +++ b/primus/configs/modules/megatron/primus_megatron_module.yaml @@ -5,6 +5,8 @@ disable_wandb: true disable_mlflow: true mlflow_run_name: null mlflow_experiment_name: null +mlflow_upload_traces: true # Upload profiler trace files to MLflow +mlflow_upload_logs: true # Upload training log files to MLflow disable_compile_dependencies: true # NOTE: # - If `use_rocm_mem_info = True`, ROCm memory information will be collected diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 9758929da..a56a3bb29 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -144,7 +144,9 @@ from primus.backends.megatron.model_provider import primus_model_provider from primus.backends.megatron.training.global_vars import ( get_mlflow_writer, + set_exp_root_path, set_primus_global_variables, + upload_mlflow_artifacts, ) from primus.backends.megatron.training.tokenizer.tokenizer import build_tokenizer from primus.core.utils import checker, file_utils @@ -1243,6 +1245,8 @@ def initialize_megatron( set_global_variables(args, build_tokenizer=False) log_rank_0(f"-set_primus_global_variables...") set_primus_global_variables(args) + # Set exp_root_path for MLflow artifact logging + set_exp_root_path(self.exp_root_path) args = get_args() # set tokenizer @@ -1611,6 +1615,11 @@ def run(self, *args, **kwargs): mlflow_writer = get_mlflow_writer() if mlflow_writer: + # Upload artifacts before ending the run + upload_mlflow_artifacts( + upload_traces=getattr(args, "mlflow_upload_traces", True), + upload_logs=getattr(args, "mlflow_upload_logs", True), + ) mlflow_writer.end_run() one_logger and one_logger.log_metrics({"app_finish_time": one_logger_utils.get_timestamp_in_ms()}) @@ -2055,6 +2064,11 @@ def get_e2e_base_metrics(): wandb_writer.finish() mlflow_writer = get_mlflow_writer() if mlflow_writer: + # Upload artifacts before ending the run + upload_mlflow_artifacts( + upload_traces=getattr(args, "mlflow_upload_traces", True), + upload_logs=getattr(args, "mlflow_upload_logs", True), + ) mlflow_writer.end_run() ft_integration.shutdown() sys.exit(exit_code) From 13dfa81a9291d5034e1baf9aa74f1749165adf89 Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 18 Dec 2025 10:28:08 +0000 Subject: [PATCH 02/24] docs: Clarify MLflow upload defaults are opt-out when MLflow enabled --- primus/configs/modules/megatron/primus_megatron_module.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/primus/configs/modules/megatron/primus_megatron_module.yaml b/primus/configs/modules/megatron/primus_megatron_module.yaml index 6d8e4a6bf..74f46f257 100644 --- a/primus/configs/modules/megatron/primus_megatron_module.yaml +++ b/primus/configs/modules/megatron/primus_megatron_module.yaml @@ -5,6 +5,8 @@ disable_wandb: true disable_mlflow: true mlflow_run_name: null mlflow_experiment_name: null +# NOTE: When disable_mlflow=false, traces and logs are uploaded by default. +# Set these to false if you only want metrics/params logged to MLflow. mlflow_upload_traces: true # Upload profiler trace files to MLflow mlflow_upload_logs: true # Upload training log files to MLflow disable_compile_dependencies: true From 1f2e136ecc8da96303d172cae9ce8c1153b328cd Mon Sep 17 00:00:00 2001 From: GP Huang Date: Thu, 18 Dec 2025 12:36:54 +0200 Subject: [PATCH 03/24] Update primus/modules/trainer/megatron/trainer.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- primus/modules/trainer/megatron/trainer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index a56a3bb29..b2c212cc5 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -1245,7 +1245,7 @@ def initialize_megatron( set_global_variables(args, build_tokenizer=False) log_rank_0(f"-set_primus_global_variables...") set_primus_global_variables(args) - # Set exp_root_path for MLflow artifact logging + # Set exp_root_path for MLflow artifact upload (needed before training starts) set_exp_root_path(self.exp_root_path) args = get_args() From d30b9202bf97dc3b1d693748d128089adc93b066 Mon Sep 17 00:00:00 2001 From: GP Huang Date: Thu, 18 Dec 2025 12:37:23 +0200 Subject: [PATCH 04/24] Update examples/run_pretrain.sh Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/run_pretrain.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/run_pretrain.sh b/examples/run_pretrain.sh index a936df288..ebde568d7 100755 --- a/examples/run_pretrain.sh +++ b/examples/run_pretrain.sh @@ -126,8 +126,8 @@ fi # Extract model name from EXP config file path (e.g., deepseek_v2_lite-pretrain.yaml -> deepseek_v2_lite-pretrain) MODEL_NAME=$(basename "${EXP}" .yaml) -# Only generate new timestamp/paths if not already set by run_slurm_pretrain.sh -# This ensures: 1) single-node gets fresh timestamp, 2) multi-node shares same directory +# Only generate new timestamp/paths if not already set by run_slurm_pretrain.sh. +# This ensures single-node runs get a fresh timestamp, while multi-node runs share the same directory. if [ -z "${PRIMUS_EXP_NAME}" ]; then TIMESTAMP=$(date +%Y%m%d_%H%M%S) export PRIMUS_WORKSPACE=${PRIMUS_WORKSPACE:-"./output"} From b2da61b84356ad03ed3adfcc23685483eae8c7d2 Mon Sep 17 00:00:00 2001 From: GP Huang Date: Thu, 18 Dec 2025 12:44:58 +0200 Subject: [PATCH 05/24] Update primus/backends/megatron/training/mlflow_artifacts.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- primus/backends/megatron/training/mlflow_artifacts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index 67caa0e62..c844036db 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -240,7 +240,7 @@ def upload_artifacts_to_mlflow( result["logs"] = upload_log_files_to_mlflow(mlflow_writer, exp_root_path, artifact_path="logs") log_rank_0( - f"[MLflow] Artifact upload complete: " f"{result['traces']} trace files, {result['logs']} log files" + f"[MLflow] Artifact upload complete: {result['traces']} trace files, {result['logs']} log files" ) return result From 283a1f4740aef8d5ecc2c627118734bfb10c098e Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 18 Dec 2025 15:14:41 +0000 Subject: [PATCH 06/24] fix: Escape glob paths to handle [] characters in experiment names The experiment name contains square brackets like [deepseek_v2_lite-pretrain_...]-rank[0] which are interpreted as glob pattern character classes, causing glob.glob to return empty results even though files exist. Fixed by using glob.escape() on directory paths before using them with glob.glob(). --- primus/backends/megatron/training/mlflow_artifacts.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index c844036db..f271dc639 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -43,9 +43,11 @@ def _get_all_trace_files(tensorboard_dir: str) -> list: trace_files = [] # Look for PyTorch profiler trace files (both compressed and uncompressed) patterns = ["*.pt.trace.json", "*.pt.trace.json.gz"] + # Escape directory path to handle special characters like [] in experiment names + escaped_dir = glob.escape(tensorboard_dir) for pattern in patterns: - trace_files.extend(glob.glob(os.path.join(tensorboard_dir, pattern))) - trace_files.extend(glob.glob(os.path.join(tensorboard_dir, "**", pattern), recursive=True)) + trace_files.extend(glob.glob(os.path.join(escaped_dir, pattern))) + trace_files.extend(glob.glob(os.path.join(escaped_dir, "**", pattern), recursive=True)) # Remove duplicates while preserving order seen = set() @@ -80,8 +82,8 @@ def _get_all_log_files(exp_root_path: str) -> list: return [] log_files = [] - # Find all .log files recursively - log_files.extend(glob.glob(os.path.join(logs_dir, "**", "*.log"), recursive=True)) + # Find all .log files recursively (escape path to handle special characters) + log_files.extend(glob.glob(os.path.join(glob.escape(logs_dir), "**", "*.log"), recursive=True)) return log_files From 2b413d07ae59cdff15a192d88c3b6cde7f8335af Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 15 Jan 2026 08:49:46 +0000 Subject: [PATCH 07/24] Minor fix: lint format --- primus/modules/trainer/megatron/trainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 2236bab4c..32abc9fdc 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -145,11 +145,11 @@ from primus.backends.megatron.model_provider import primus_model_provider from primus.backends.megatron.training.global_vars import ( get_mlflow_writer, - set_exp_root_path, - upload_mlflow_artifacts, get_train_start_time, + set_exp_root_path, set_primus_global_variables, set_train_start_time, + upload_mlflow_artifacts, ) from primus.backends.megatron.training.tokenizer.tokenizer import build_tokenizer from primus.core.utils import checker, file_utils From d7417d874f4a0b68bda4d6c15530e96ed8bc3f67 Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 15 Jan 2026 10:24:48 +0000 Subject: [PATCH 08/24] minor fix --- primus/backends/megatron/training/global_vars.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/primus/backends/megatron/training/global_vars.py b/primus/backends/megatron/training/global_vars.py index e58029177..ffd0c4531 100644 --- a/primus/backends/megatron/training/global_vars.py +++ b/primus/backends/megatron/training/global_vars.py @@ -5,7 +5,6 @@ # See LICENSE for license information. ############################################################################### - import time from primus.modules.module_utils import debug_rank_0 @@ -38,6 +37,8 @@ def set_exp_root_path(exp_root_path): def get_exp_root_path(): """Return experiment root path. Can be None.""" return _GLOBAL_EXP_ROOT_PATH + + def set_train_start_time(start_time=None): """Set training start time. If not provided, use current time.""" global _TRAIN_START_TIME From 167ec9953ead09e79aa93c0539e7dbd6784665c6 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 2 Feb 2026 12:43:41 +0000 Subject: [PATCH 09/24] Refactor MLflow artifact features to separate module Move MLflow artifact upload functions from global_vars.py to new mlflow_setup.py to reduce merge conflicts: - set_exp_root_path() - get_exp_root_path() - upload_mlflow_artifacts() global_vars.py now matches main, avoiding future conflicts when merging from main branch. --- .../backends/megatron/training/global_vars.py | 56 +-------------- .../megatron/training/mlflow_setup.py | 69 +++++++++++++++++++ primus/modules/trainer/megatron/trainer.py | 4 +- 3 files changed, 73 insertions(+), 56 deletions(-) create mode 100644 primus/backends/megatron/training/mlflow_setup.py diff --git a/primus/backends/megatron/training/global_vars.py b/primus/backends/megatron/training/global_vars.py index 6b6ff96a9..5b2ae4825 100644 --- a/primus/backends/megatron/training/global_vars.py +++ b/primus/backends/megatron/training/global_vars.py @@ -5,6 +5,7 @@ # See LICENSE for license information. ############################################################################### + import json import time @@ -14,11 +15,8 @@ ) from primus.modules.module_utils import debug_rank_0 -from .mlflow_artifacts import upload_artifacts_to_mlflow - _GLOBAL_ARGS = None _GLOBAL_MLFLOW_WRITER = None -_GLOBAL_EXP_ROOT_PATH = None _TRAIN_START_TIME = None @@ -33,17 +31,6 @@ def get_args(): return _GLOBAL_ARGS -def set_exp_root_path(exp_root_path): - """Set the experiment root path for artifact logging.""" - global _GLOBAL_EXP_ROOT_PATH - _GLOBAL_EXP_ROOT_PATH = exp_root_path - - -def get_exp_root_path(): - """Return experiment root path. Can be None.""" - return _GLOBAL_EXP_ROOT_PATH - - def set_train_start_time(start_time=None): """Set training start time. If not provided, use current time.""" global _TRAIN_START_TIME @@ -149,51 +136,14 @@ def _set_mlflow_writer(args): _GLOBAL_MLFLOW_WRITER = mlflow -def upload_mlflow_artifacts( - upload_traces: bool = True, - upload_logs: bool = True, -): - """ - Upload trace files and log files to MLflow as artifacts. - - This should be called before ending the MLflow run to ensure all - artifacts are uploaded. Only the rank that initialized MLflow - (typically rank world_size - 1) should call this. - - Args: - upload_traces: Whether to upload profiler trace files - upload_logs: Whether to upload training log files - - Returns: - Dictionary with counts of uploaded files, or None if MLflow is not enabled - """ - mlflow_writer = get_mlflow_writer() - if mlflow_writer is None: - return None - - args = get_args() - exp_root_path = get_exp_root_path() - tensorboard_dir = getattr(args, "tensorboard_dir", None) - - return upload_artifacts_to_mlflow( - mlflow_writer=mlflow_writer, - tensorboard_dir=tensorboard_dir, - exp_root_path=exp_root_path, - upload_traces=upload_traces, - upload_logs=upload_logs, - ) - - def unset_global_variables(): """Unset global vars.""" global _GLOBAL_ARGS global _GLOBAL_MLFLOW_WRITER - global _GLOBAL_EXP_ROOT_PATH _GLOBAL_ARGS = None _GLOBAL_MLFLOW_WRITER = None - _GLOBAL_EXP_ROOT_PATH = None def _ensure_var_is_initialized(var, name): @@ -208,8 +158,4 @@ def _ensure_var_is_not_initialized(var, name): def destroy_global_vars(): global _GLOBAL_ARGS - global _GLOBAL_MLFLOW_WRITER - global _GLOBAL_EXP_ROOT_PATH _GLOBAL_ARGS = None - _GLOBAL_MLFLOW_WRITER = None - _GLOBAL_EXP_ROOT_PATH = None diff --git a/primus/backends/megatron/training/mlflow_setup.py b/primus/backends/megatron/training/mlflow_setup.py new file mode 100644 index 000000000..119f394e9 --- /dev/null +++ b/primus/backends/megatron/training/mlflow_setup.py @@ -0,0 +1,69 @@ +############################################################################### +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# Modification Copyright© 2025 Advanced Micro Devices, Inc. All rights reserved. +# +# See LICENSE for license information. +############################################################################### +""" +MLflow artifact upload utilities. + +This module provides functions for uploading artifacts (traces, logs) to MLflow. +Separated from global_vars.py to reduce merge conflicts. +""" + +from .global_vars import get_args, get_mlflow_writer +from .mlflow_artifacts import upload_artifacts_to_mlflow + +_GLOBAL_EXP_ROOT_PATH = None + + +def set_exp_root_path(exp_root_path): + """Set the experiment root path for artifact logging.""" + global _GLOBAL_EXP_ROOT_PATH + _GLOBAL_EXP_ROOT_PATH = exp_root_path + + +def get_exp_root_path(): + """Return experiment root path. Can be None.""" + return _GLOBAL_EXP_ROOT_PATH + + +def reset_exp_root_path(): + """Reset the experiment root path to None.""" + global _GLOBAL_EXP_ROOT_PATH + _GLOBAL_EXP_ROOT_PATH = None + + +def upload_mlflow_artifacts( + upload_traces: bool = True, + upload_logs: bool = True, +): + """ + Upload trace files and log files to MLflow as artifacts. + + This should be called before ending the MLflow run to ensure all + artifacts are uploaded. Only the rank that initialized MLflow + (typically rank world_size - 1) should call this. + + Args: + upload_traces: Whether to upload profiler trace files + upload_logs: Whether to upload training log files + + Returns: + Dictionary with counts of uploaded files, or None if MLflow is not enabled + """ + mlflow_writer = get_mlflow_writer() + if mlflow_writer is None: + return None + + args = get_args() + exp_root_path = get_exp_root_path() + tensorboard_dir = getattr(args, "tensorboard_dir", None) + + return upload_artifacts_to_mlflow( + mlflow_writer=mlflow_writer, + tensorboard_dir=tensorboard_dir, + exp_root_path=exp_root_path, + upload_traces=upload_traces, + upload_logs=upload_logs, + ) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index fbcd3fd95..f718c9dd6 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -146,9 +146,11 @@ from primus.backends.megatron.training.global_vars import ( get_mlflow_writer, get_train_start_time, - set_exp_root_path, set_primus_global_variables, set_train_start_time, +) +from primus.backends.megatron.training.mlflow_setup import ( + set_exp_root_path, upload_mlflow_artifacts, ) from primus.backends.megatron.training.tokenizer.tokenizer import build_tokenizer From 24bfa173fa50d60fe618c8415c91761f4bcface3 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 2 Feb 2026 14:38:19 +0000 Subject: [PATCH 10/24] Revert run script modifications Keep run_pretrain.sh and run_slurm_pretrain.sh as main. Experiment paths can be configured via environment variables: - PRIMUS_TEAM, PRIMUS_USER, PRIMUS_EXP_NAME, PRIMUS_WORKSPACE --- examples/run_pretrain.sh | 21 ++------------------- examples/run_slurm_pretrain.sh | 19 +------------------ 2 files changed, 3 insertions(+), 37 deletions(-) diff --git a/examples/run_pretrain.sh b/examples/run_pretrain.sh index 80d866e2d..dd23ec02d 100755 --- a/examples/run_pretrain.sh +++ b/examples/run_pretrain.sh @@ -149,28 +149,11 @@ fi # export AITER_JIT_DIR="${TMP_BUILD_DIR}/${CACHE_TAG}_aiter_cache" -# Extract model name from EXP config file path (e.g., deepseek_v2_lite-pretrain.yaml -> deepseek_v2_lite-pretrain) -MODEL_NAME=$(basename "${EXP}" .yaml) - -# Only generate new timestamp/paths if not already set by run_slurm_pretrain.sh. -# This ensures single-node runs get a fresh timestamp, while multi-node runs share the same directory. -if [ -z "${PRIMUS_EXP_NAME}" ]; then - TIMESTAMP=$(date +%Y%m%d_%H%M%S) - export PRIMUS_WORKSPACE=${PRIMUS_WORKSPACE:-"./output"} - export PRIMUS_EXP_NAME="${MODEL_NAME}_${TIMESTAMP}" - export LOG_DIR="${PRIMUS_WORKSPACE}/${PRIMUS_EXP_NAME}" -fi -# Clear work_group and user_name to simplify path: workspace/exp_name -export PRIMUS_TEAM="" -export PRIMUS_USER="" - -mkdir -p "$LOG_DIR" -TRAIN_LOG="${LOG_DIR}/log_mp_pretrain.txt" +TRAIN_LOG=${TRAIN_LOG:-"output/log_mp_pretrain_$(basename "$EXP" .yaml).txt"} LOG_INFO_RANK0 "==========Training info==========" LOG_INFO_RANK0 "EXP: $EXP" -LOG_INFO_RANK0 "BACKEND: $BACKEND" -LOG_INFO_RANK0 "OUTPUT_DIR: ${LOG_DIR}" +LOG_INFO_RANK0 "EXP: $BACKEND" LOG_INFO_RANK0 "TRAIN_LOG: $TRAIN_LOG" LOG_INFO_RANK0 "PRIMUS_PATH: $PRIMUS_PATH" LOG_INFO_RANK0 "DATA_PATH: $DATA_PATH" diff --git a/examples/run_slurm_pretrain.sh b/examples/run_slurm_pretrain.sh index 7e6523239..04da35a4d 100755 --- a/examples/run_slurm_pretrain.sh +++ b/examples/run_slurm_pretrain.sh @@ -34,22 +34,7 @@ export NNODES=${NNODES:-1} SCRIPT_DIR=$(dirname "$(realpath "${BASH_SOURCE[0]}")") -# -------------------- Unique Output Directory Per Run -------------------- -# Extract model name from EXP config file path (e.g., deepseek_v2_lite-pretrain.yaml -> deepseek_v2_lite-pretrain) -MODEL_NAME=$(basename "${EXP:-unknown}" .yaml) -# Export TIMESTAMP so all nodes use the same value (prevents multi-node race condition) -TIMESTAMP=$(date +%Y%m%d_%H%M%S) -export TIMESTAMP - -# Set PRIMUS environment variables for output paths -BASE_LOG_DIR=${LOG_DIR:-"./output"} -export PRIMUS_WORKSPACE="${BASE_LOG_DIR}" -export PRIMUS_EXP_NAME="${MODEL_NAME}_${TIMESTAMP}" -export LOG_DIR="${PRIMUS_WORKSPACE}/${PRIMUS_EXP_NAME}" -# Clear work_group and user_name to simplify path: workspace/exp_name -export PRIMUS_TEAM="" -export PRIMUS_USER="" - +export LOG_DIR=${LOG_DIR:-"./output"} LOG_FILE="${LOG_DIR}/log_slurm_pretrain.txt" mkdir -p "$LOG_DIR" @@ -67,8 +52,6 @@ srun -N "${NNODES}" \ echo \"SLURM_GPUS_ON_NODE: \${SLURM_GPUS_ON_NODE}\" echo \"\" fi - # Log TIMESTAMP on each node to verify consistency across nodes - echo \"[Node \$SLURM_NODEID] TIMESTAMP=\${TIMESTAMP}\" export MASTER_ADDR=\${node_array[0]} export MASTER_PORT=\${MASTER_PORT} export NNODES=\${SLURM_NNODES} From 62196029bf8a49894f8bb6fe4e44ca2f8ab4f4d8 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 2 Feb 2026 14:45:23 +0000 Subject: [PATCH 11/24] Revert run_local_pretrain.sh to main --- examples/run_local_pretrain.sh | 7 ------- 1 file changed, 7 deletions(-) diff --git a/examples/run_local_pretrain.sh b/examples/run_local_pretrain.sh index c24111984..04fe9720b 100755 --- a/examples/run_local_pretrain.sh +++ b/examples/run_local_pretrain.sh @@ -103,7 +103,6 @@ ENV_ARGS+=("--env" "HF_TOKEN") ENV_ARGS+=("--env" "WANDB_API_KEY") ENV_ARGS+=("--env" "ENABLE_NUMA_BINDING") ENV_ARGS+=("--env" "HSA_KERNARG_POOL_SIZE") -# MLflow environment variables ENV_ARGS+=("--env" "DATABRICKS_TOKEN") ENV_ARGS+=("--env" "DATABRICKS_HOST") ENV_ARGS+=("--env" "MLFLOW_TRACKING_URI") @@ -174,12 +173,6 @@ docker_podman_proxy run --rm \ --env GPUS_PER_NODE \ --env DATA_PATH \ --env TRAIN_LOG \ - --env PRIMUS_WORKSPACE \ - --env PRIMUS_EXP_NAME \ - --env TIMESTAMP \ - --env LOG_DIR \ - --env PRIMUS_TEAM \ - --env PRIMUS_USER \ --env HSA_NO_SCRATCH_RECLAIM \ --env NVTE_CK_USES_BWD_V3 \ --env GPU_MAX_HW_QUEUES \ From 0aa8aa33518070a201582003cb59a5feece98e45 Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 3 Feb 2026 11:29:29 +0000 Subject: [PATCH 12/24] Address PR review feedback for MLflow artifacts - Add progress logging with counter (e.g., "Uploaded 5/100 trace files") - Warn about long upload times for large file counts - Add dist.barrier() before uploads to sync all ranks - Document multi-node shared storage requirement - Add runtime warning when multi-node training detected - Add comprehensive unit tests for mlflow_artifacts module --- .../megatron/training/mlflow_artifacts.py | 50 +++- primus/modules/trainer/megatron/trainer.py | 6 + .../megatron/test_mlflow_artifacts.py | 272 ++++++++++++++++++ 3 files changed, 325 insertions(+), 3 deletions(-) create mode 100644 tests/unit_tests/backends/megatron/test_mlflow_artifacts.py diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index f271dc639..f37408e61 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -14,6 +14,12 @@ - Upload profiler trace files from all profiled ranks (including multi-node) - Upload log files from all levels and all ranks - Supports both local and distributed training scenarios + +Note: + Multi-node training requires shared storage (e.g., NFS) for artifact uploads. + Only the last rank (world_size - 1) performs the upload, so it must have + access to trace and log files from all nodes. If using node-local storage, + only files from the uploading node will be uploaded. """ import glob @@ -122,6 +128,16 @@ def upload_trace_files_to_mlflow( log_rank_0("[MLflow] No trace files found to upload") return 0 + total_files = len(trace_files) + + # Warn about potentially long upload times for large uploads + if total_files > 10: + total_size_mb = sum(os.path.getsize(f) for f in trace_files) / (1024 * 1024) + warning_rank_0( + f"[MLflow] Uploading {total_files} trace files ({total_size_mb:.1f} MB total). " + "This may take a while..." + ) + uploaded_count = 0 for trace_file in trace_files: try: @@ -136,11 +152,15 @@ def upload_trace_files_to_mlflow( mlflow_writer.log_artifact(trace_file, artifact_path=artifact_subpath) uploaded_count += 1 - log_rank_0(f"[MLflow] Uploaded trace file: {os.path.basename(trace_file)}") + # Progress logging with counter + log_rank_0( + f"[MLflow] Uploaded trace file ({uploaded_count}/{total_files}): " + f"{os.path.basename(trace_file)}" + ) except Exception as e: warning_rank_0(f"[MLflow] Failed to upload trace file {trace_file}: {e}") - log_rank_0(f"[MLflow] Uploaded {uploaded_count} trace files to '{artifact_path}'") + log_rank_0(f"[MLflow] Uploaded {uploaded_count}/{total_files} trace files to '{artifact_path}'") return uploaded_count @@ -173,6 +193,16 @@ def upload_log_files_to_mlflow( log_rank_0("[MLflow] No log files found to upload") return 0 + total_files = len(log_files) + + # Warn about potentially long upload times for large uploads + if total_files > 20: + total_size_mb = sum(os.path.getsize(f) for f in log_files) / (1024 * 1024) + warning_rank_0( + f"[MLflow] Uploading {total_files} log files ({total_size_mb:.1f} MB total). " + "This may take a while..." + ) + logs_base_dir = os.path.join(exp_root_path, "logs") uploaded_count = 0 @@ -191,7 +221,7 @@ def upload_log_files_to_mlflow( except Exception as e: warning_rank_0(f"[MLflow] Failed to upload log file {log_file}: {e}") - log_rank_0(f"[MLflow] Uploaded {uploaded_count} log files to '{artifact_path}'") + log_rank_0(f"[MLflow] Uploaded {uploaded_count}/{total_files} log files to '{artifact_path}'") return uploaded_count @@ -208,6 +238,11 @@ def upload_artifacts_to_mlflow( This is the main entry point for uploading artifacts to MLflow. It handles both trace files from profiling and log files from training. + Note: + Multi-node training requires shared storage (e.g., NFS) for complete + artifact uploads. Only the last rank performs the upload, so it must + have filesystem access to trace/log files from all nodes. + Args: mlflow_writer: The MLflow module instance (from get_mlflow_writer()) tensorboard_dir: Path to the tensorboard directory containing trace files @@ -226,6 +261,15 @@ def upload_artifacts_to_mlflow( log_rank_0("[MLflow] MLflow writer not available, skipping artifact upload") return {"traces": 0, "logs": 0} + # Warn about multi-node shared storage requirement + nnodes = int(os.environ.get("NNODES", os.environ.get("SLURM_NNODES", "1"))) + if nnodes > 1: + warning_rank_0( + f"[MLflow] Multi-node training detected ({nnodes} nodes). " + "Ensure shared storage (e.g., NFS) is used for complete artifact uploads. " + "Only files accessible from this node will be uploaded." + ) + log_rank_0("[MLflow] Starting artifact upload to MLflow...") log_rank_0(f"[MLflow] tensorboard_dir: {tensorboard_dir}") log_rank_0(f"[MLflow] exp_root_path: {exp_root_path}") diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index f718c9dd6..8f063d146 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -1127,6 +1127,9 @@ def run(self, *args, **kwargs): mlflow_writer = get_mlflow_writer() if mlflow_writer: + # Barrier to ensure all ranks have finished writing files before upload + if dist.is_initialized(): + dist.barrier() # Upload artifacts before ending the run upload_mlflow_artifacts( upload_traces=getattr(args, "mlflow_upload_traces", True), @@ -1576,6 +1579,9 @@ def get_e2e_base_metrics(): wandb_writer.finish() mlflow_writer = get_mlflow_writer() if mlflow_writer: + # Barrier to ensure all ranks have finished writing files before upload + if dist.is_initialized(): + dist.barrier() # Upload artifacts before ending the run upload_mlflow_artifacts( upload_traces=getattr(args, "mlflow_upload_traces", True), diff --git a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py new file mode 100644 index 000000000..686c9c3cb --- /dev/null +++ b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py @@ -0,0 +1,272 @@ +############################################################################### +# Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. +# +# See LICENSE for license information. +############################################################################### + +""" +Unit tests for MLflow artifact upload utilities. + +Focus areas: + 1. File discovery logic (_get_all_trace_files, _get_all_log_files) + 2. Upload functions with various scenarios (no files, multiple files, errors) + 3. Glob escaping for special characters in paths + 4. Relative path handling for artifact organization +""" + +from unittest.mock import MagicMock, patch + +from primus.backends.megatron.training.mlflow_artifacts import ( + _get_all_log_files, + _get_all_trace_files, + upload_artifacts_to_mlflow, + upload_log_files_to_mlflow, + upload_trace_files_to_mlflow, +) + + +class TestGetAllTraceFiles: + """Test trace file discovery logic.""" + + def test_finds_json_trace_files(self, tmp_path): + """Should find .pt.trace.json files.""" + trace_file = tmp_path / "rank_0_step_2.pt.trace.json" + trace_file.touch() + + files = _get_all_trace_files(str(tmp_path)) + + assert len(files) == 1 + assert str(trace_file) in files + + def test_finds_gzipped_trace_files(self, tmp_path): + """Should find .pt.trace.json.gz files.""" + trace_file = tmp_path / "rank_0_step_2.pt.trace.json.gz" + trace_file.touch() + + files = _get_all_trace_files(str(tmp_path)) + + assert len(files) == 1 + assert str(trace_file) in files + + def test_finds_nested_trace_files(self, tmp_path): + """Should find trace files in subdirectories.""" + subdir = tmp_path / "subdir" + subdir.mkdir() + trace_file = subdir / "rank_1.pt.trace.json" + trace_file.touch() + + files = _get_all_trace_files(str(tmp_path)) + + assert len(files) == 1 + assert str(trace_file) in files + + def test_returns_empty_for_nonexistent_dir(self): + """Should return empty list for non-existent directory.""" + files = _get_all_trace_files("/nonexistent/path") + + assert files == [] + + def test_returns_empty_for_none(self): + """Should return empty list for None input.""" + files = _get_all_trace_files(None) + + assert files == [] + + def test_handles_special_characters_in_path(self, tmp_path): + """Should handle paths with special glob characters like [].""" + # Create directory with brackets in name (common in experiment names) + special_dir = tmp_path / "exp[rank0]_test" + special_dir.mkdir() + trace_file = special_dir / "trace.pt.trace.json" + trace_file.touch() + + files = _get_all_trace_files(str(special_dir)) + + assert len(files) == 1 + assert str(trace_file) in files + + def test_deduplicates_files(self, tmp_path): + """Should not return duplicate file paths.""" + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.touch() + + files = _get_all_trace_files(str(tmp_path)) + + # Each file should appear only once + assert len(files) == len(set(files)) + + +class TestGetAllLogFiles: + """Test log file discovery logic.""" + + def test_finds_log_files(self, tmp_path): + """Should find .log files in logs directory.""" + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + log_file = logs_dir / "training.log" + log_file.touch() + + files = _get_all_log_files(str(tmp_path)) + + assert len(files) == 1 + assert str(log_file) in files + + def test_finds_nested_log_files(self, tmp_path): + """Should find log files in nested directories.""" + logs_dir = tmp_path / "logs" / "rank-0" + logs_dir.mkdir(parents=True) + log_file = logs_dir / "debug.log" + log_file.touch() + + files = _get_all_log_files(str(tmp_path)) + + assert len(files) == 1 + assert str(log_file) in files + + def test_returns_empty_when_no_logs_dir(self, tmp_path): + """Should return empty list when logs directory doesn't exist.""" + files = _get_all_log_files(str(tmp_path)) + + assert files == [] + + def test_returns_empty_for_none(self): + """Should return empty list for None input.""" + files = _get_all_log_files(None) + + assert files == [] + + +class TestUploadTraceFilesToMlflow: + """Test trace file upload functionality.""" + + def test_returns_zero_when_no_writer(self, tmp_path): + """Should return 0 when mlflow_writer is None.""" + count = upload_trace_files_to_mlflow(None, str(tmp_path)) + + assert count == 0 + + def test_returns_zero_when_no_files(self, tmp_path): + """Should return 0 when no trace files found.""" + mlflow_mock = MagicMock() + + count = upload_trace_files_to_mlflow(mlflow_mock, str(tmp_path)) + + assert count == 0 + mlflow_mock.log_artifact.assert_not_called() + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_uploads_trace_files(self, mock_warning, mock_log, tmp_path): + """Should upload trace files and return count.""" + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.touch() + mlflow_mock = MagicMock() + + count = upload_trace_files_to_mlflow(mlflow_mock, str(tmp_path)) + + assert count == 1 + mlflow_mock.log_artifact.assert_called_once() + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_handles_upload_error(self, mock_warning, mock_log, tmp_path): + """Should continue on upload error and log warning.""" + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.touch() + mlflow_mock = MagicMock() + mlflow_mock.log_artifact.side_effect = Exception("Upload failed") + + count = upload_trace_files_to_mlflow(mlflow_mock, str(tmp_path)) + + assert count == 0 + mock_warning.assert_called() + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_preserves_relative_path(self, mock_warning, mock_log, tmp_path): + """Should preserve subdirectory structure in artifact path.""" + subdir = tmp_path / "subdir" + subdir.mkdir() + trace_file = subdir / "rank_0.pt.trace.json" + trace_file.touch() + mlflow_mock = MagicMock() + + upload_trace_files_to_mlflow(mlflow_mock, str(tmp_path)) + + # Check that artifact_path includes subdirectory + call_args = mlflow_mock.log_artifact.call_args + assert "subdir" in call_args.kwargs.get("artifact_path", "") + + +class TestUploadLogFilesToMlflow: + """Test log file upload functionality.""" + + def test_returns_zero_when_no_writer(self, tmp_path): + """Should return 0 when mlflow_writer is None.""" + count = upload_log_files_to_mlflow(None, str(tmp_path)) + + assert count == 0 + + def test_returns_zero_when_no_files(self, tmp_path): + """Should return 0 when no log files found.""" + mlflow_mock = MagicMock() + + count = upload_log_files_to_mlflow(mlflow_mock, str(tmp_path)) + + assert count == 0 + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_uploads_log_files(self, mock_warning, mock_log, tmp_path): + """Should upload log files and return count.""" + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + log_file = logs_dir / "training.log" + log_file.touch() + mlflow_mock = MagicMock() + + count = upload_log_files_to_mlflow(mlflow_mock, str(tmp_path)) + + assert count == 1 + mlflow_mock.log_artifact.assert_called_once() + + +class TestUploadArtifactsToMlflow: + """Test main artifact upload entry point.""" + + def test_returns_zeros_when_no_writer(self): + """Should return zero counts when mlflow_writer is None.""" + result = upload_artifacts_to_mlflow(None) + + assert result == {"traces": 0, "logs": 0} + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_respects_upload_traces_flag(self, mock_warning, mock_log, tmp_path): + """Should skip trace upload when upload_traces=False.""" + trace_file = tmp_path / "rank_0.pt.trace.json" + trace_file.touch() + mlflow_mock = MagicMock() + + result = upload_artifacts_to_mlflow( + mlflow_mock, + tensorboard_dir=str(tmp_path), + upload_traces=False, + upload_logs=False, + ) + + assert result["traces"] == 0 + mlflow_mock.log_artifact.assert_not_called() + + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + def test_warns_for_multi_node(self, mock_warning, mock_log, tmp_path, monkeypatch): + """Should warn when multi-node training is detected.""" + monkeypatch.setenv("NNODES", "2") + mlflow_mock = MagicMock() + + upload_artifacts_to_mlflow(mlflow_mock, tensorboard_dir=str(tmp_path)) + + # Check that warning was called with multi-node message + warning_calls = [str(call) for call in mock_warning.call_args_list] + assert any("Multi-node" in str(call) for call in warning_calls) From f6fb2b03ec8ea5491b18bfba134ccd89b29c656f Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 3 Feb 2026 11:33:16 +0000 Subject: [PATCH 13/24] Use log_rank_last for MLflow artifact logging MLflow is initialized on the last rank, so use log_rank_last instead of log_rank_0 to ensure messages are visible. Added _log_warning helper for warning messages. --- .../megatron/training/mlflow_artifacts.py | 49 +++++++++++-------- .../megatron/test_mlflow_artifacts.py | 24 ++++----- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index f37408e61..b410c53a6 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -26,7 +26,16 @@ import os from typing import Optional -from primus.modules.module_utils import log_rank_0, warning_rank_0 +from primus.modules.module_utils import log_rank_last + +# Note: This module is called on the last rank (where MLflow is initialized). +# Using log_rank_last ensures messages are visible. For warnings, we prefix +# with [WARNING] since warning_rank_last doesn't exist. + + +def _log_warning(msg: str) -> None: + """Log a warning message on the last rank.""" + log_rank_last(f"[WARNING] {msg}") def _get_all_trace_files(tensorboard_dir: str) -> list: @@ -117,15 +126,15 @@ def upload_trace_files_to_mlflow( if mlflow_writer is None: return 0 - log_rank_0(f"[MLflow] Searching for trace files in: {tensorboard_dir}") + log_rank_last(f"[MLflow] Searching for trace files in: {tensorboard_dir}") trace_files = _get_all_trace_files(tensorboard_dir) if len(trace_files) > 5: - log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files[:5]}...") + log_rank_last(f"[MLflow] Found {len(trace_files)} trace files: {trace_files[:5]}...") else: - log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files}") + log_rank_last(f"[MLflow] Found {len(trace_files)} trace files: {trace_files}") if not trace_files: - log_rank_0("[MLflow] No trace files found to upload") + log_rank_last("[MLflow] No trace files found to upload") return 0 total_files = len(trace_files) @@ -133,7 +142,7 @@ def upload_trace_files_to_mlflow( # Warn about potentially long upload times for large uploads if total_files > 10: total_size_mb = sum(os.path.getsize(f) for f in trace_files) / (1024 * 1024) - warning_rank_0( + _log_warning( f"[MLflow] Uploading {total_files} trace files ({total_size_mb:.1f} MB total). " "This may take a while..." ) @@ -153,14 +162,14 @@ def upload_trace_files_to_mlflow( mlflow_writer.log_artifact(trace_file, artifact_path=artifact_subpath) uploaded_count += 1 # Progress logging with counter - log_rank_0( + log_rank_last( f"[MLflow] Uploaded trace file ({uploaded_count}/{total_files}): " f"{os.path.basename(trace_file)}" ) except Exception as e: - warning_rank_0(f"[MLflow] Failed to upload trace file {trace_file}: {e}") + _log_warning(f"[MLflow] Failed to upload trace file {trace_file}: {e}") - log_rank_0(f"[MLflow] Uploaded {uploaded_count}/{total_files} trace files to '{artifact_path}'") + log_rank_last(f"[MLflow] Uploaded {uploaded_count}/{total_files} trace files to '{artifact_path}'") return uploaded_count @@ -190,7 +199,7 @@ def upload_log_files_to_mlflow( log_files = _get_all_log_files(exp_root_path) if not log_files: - log_rank_0("[MLflow] No log files found to upload") + log_rank_last("[MLflow] No log files found to upload") return 0 total_files = len(log_files) @@ -198,7 +207,7 @@ def upload_log_files_to_mlflow( # Warn about potentially long upload times for large uploads if total_files > 20: total_size_mb = sum(os.path.getsize(f) for f in log_files) / (1024 * 1024) - warning_rank_0( + _log_warning( f"[MLflow] Uploading {total_files} log files ({total_size_mb:.1f} MB total). " "This may take a while..." ) @@ -219,9 +228,9 @@ def upload_log_files_to_mlflow( mlflow_writer.log_artifact(log_file, artifact_path=artifact_subpath) uploaded_count += 1 except Exception as e: - warning_rank_0(f"[MLflow] Failed to upload log file {log_file}: {e}") + _log_warning(f"[MLflow] Failed to upload log file {log_file}: {e}") - log_rank_0(f"[MLflow] Uploaded {uploaded_count}/{total_files} log files to '{artifact_path}'") + log_rank_last(f"[MLflow] Uploaded {uploaded_count}/{total_files} log files to '{artifact_path}'") return uploaded_count @@ -258,22 +267,22 @@ def upload_artifacts_to_mlflow( } """ if mlflow_writer is None: - log_rank_0("[MLflow] MLflow writer not available, skipping artifact upload") + log_rank_last("[MLflow] MLflow writer not available, skipping artifact upload") return {"traces": 0, "logs": 0} # Warn about multi-node shared storage requirement nnodes = int(os.environ.get("NNODES", os.environ.get("SLURM_NNODES", "1"))) if nnodes > 1: - warning_rank_0( + _log_warning( f"[MLflow] Multi-node training detected ({nnodes} nodes). " "Ensure shared storage (e.g., NFS) is used for complete artifact uploads. " "Only files accessible from this node will be uploaded." ) - log_rank_0("[MLflow] Starting artifact upload to MLflow...") - log_rank_0(f"[MLflow] tensorboard_dir: {tensorboard_dir}") - log_rank_0(f"[MLflow] exp_root_path: {exp_root_path}") - log_rank_0(f"[MLflow] upload_traces: {upload_traces}, upload_logs: {upload_logs}") + log_rank_last("[MLflow] Starting artifact upload to MLflow...") + log_rank_last(f"[MLflow] tensorboard_dir: {tensorboard_dir}") + log_rank_last(f"[MLflow] exp_root_path: {exp_root_path}") + log_rank_last(f"[MLflow] upload_traces: {upload_traces}, upload_logs: {upload_logs}") result = {"traces": 0, "logs": 0} @@ -285,7 +294,7 @@ def upload_artifacts_to_mlflow( if upload_logs and exp_root_path: result["logs"] = upload_log_files_to_mlflow(mlflow_writer, exp_root_path, artifact_path="logs") - log_rank_0( + log_rank_last( f"[MLflow] Artifact upload complete: {result['traces']} trace files, {result['logs']} log files" ) diff --git a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py index 686c9c3cb..14f92e936 100644 --- a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py +++ b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py @@ -154,8 +154,8 @@ def test_returns_zero_when_no_files(self, tmp_path): assert count == 0 mlflow_mock.log_artifact.assert_not_called() - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_uploads_trace_files(self, mock_warning, mock_log, tmp_path): """Should upload trace files and return count.""" trace_file = tmp_path / "rank_0.pt.trace.json" @@ -167,8 +167,8 @@ def test_uploads_trace_files(self, mock_warning, mock_log, tmp_path): assert count == 1 mlflow_mock.log_artifact.assert_called_once() - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_handles_upload_error(self, mock_warning, mock_log, tmp_path): """Should continue on upload error and log warning.""" trace_file = tmp_path / "rank_0.pt.trace.json" @@ -181,8 +181,8 @@ def test_handles_upload_error(self, mock_warning, mock_log, tmp_path): assert count == 0 mock_warning.assert_called() - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_preserves_relative_path(self, mock_warning, mock_log, tmp_path): """Should preserve subdirectory structure in artifact path.""" subdir = tmp_path / "subdir" @@ -215,8 +215,8 @@ def test_returns_zero_when_no_files(self, tmp_path): assert count == 0 - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_uploads_log_files(self, mock_warning, mock_log, tmp_path): """Should upload log files and return count.""" logs_dir = tmp_path / "logs" @@ -240,8 +240,8 @@ def test_returns_zeros_when_no_writer(self): assert result == {"traces": 0, "logs": 0} - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_respects_upload_traces_flag(self, mock_warning, mock_log, tmp_path): """Should skip trace upload when upload_traces=False.""" trace_file = tmp_path / "rank_0.pt.trace.json" @@ -258,8 +258,8 @@ def test_respects_upload_traces_flag(self, mock_warning, mock_log, tmp_path): assert result["traces"] == 0 mlflow_mock.log_artifact.assert_not_called() - @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_0") - @patch("primus.backends.megatron.training.mlflow_artifacts.warning_rank_0") + @patch("primus.backends.megatron.training.mlflow_artifacts.log_rank_last") + @patch("primus.backends.megatron.training.mlflow_artifacts._log_warning") def test_warns_for_multi_node(self, mock_warning, mock_log, tmp_path, monkeypatch): """Should warn when multi-node training is detected.""" monkeypatch.setenv("NNODES", "2") From e304bb3bb333f2a4552266388e1ea157596710b5 Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 3 Feb 2026 11:37:06 +0000 Subject: [PATCH 14/24] Extract _finalize_mlflow_run helper to reduce duplication Consolidate MLflow finalization logic (barrier, upload, end_run) into a single helper function called from both normal completion and exit condition paths. --- primus/modules/trainer/megatron/trainer.py | 48 +++++++++++++--------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 8f063d146..982cd25c3 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -178,6 +178,32 @@ set_train_start_time() +def _finalize_mlflow_run(args, mlflow_writer) -> None: + """ + Finalize MLflow run: sync ranks, upload artifacts, and end the run. + + This helper function consolidates the MLflow finalization logic to avoid + code duplication between normal training completion and exit conditions. + + Args: + args: Megatron arguments containing mlflow_upload_traces/logs settings + mlflow_writer: The MLflow writer instance (or None if not enabled) + """ + if mlflow_writer is None: + return + + # Barrier to ensure all ranks have finished writing files before upload + if dist.is_initialized(): + dist.barrier() + + # Upload artifacts before ending the run + upload_mlflow_artifacts( + upload_traces=getattr(args, "mlflow_upload_traces", True), + upload_logs=getattr(args, "mlflow_upload_logs", True), + ) + mlflow_writer.end_run() + + class MegatronTrainer(BaseTrainer, BaseModule): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1126,16 +1152,7 @@ def run(self, *args, **kwargs): ft_integration.on_checkpointing_end(is_async_finalization=True) mlflow_writer = get_mlflow_writer() - if mlflow_writer: - # Barrier to ensure all ranks have finished writing files before upload - if dist.is_initialized(): - dist.barrier() - # Upload artifacts before ending the run - upload_mlflow_artifacts( - upload_traces=getattr(args, "mlflow_upload_traces", True), - upload_logs=getattr(args, "mlflow_upload_logs", True), - ) - mlflow_writer.end_run() + _finalize_mlflow_run(args, mlflow_writer) one_logger and one_logger.log_metrics({"app_finish_time": one_logger_utils.get_timestamp_in_ms()}) @@ -1578,16 +1595,7 @@ def get_e2e_base_metrics(): if wandb_writer: wandb_writer.finish() mlflow_writer = get_mlflow_writer() - if mlflow_writer: - # Barrier to ensure all ranks have finished writing files before upload - if dist.is_initialized(): - dist.barrier() - # Upload artifacts before ending the run - upload_mlflow_artifacts( - upload_traces=getattr(args, "mlflow_upload_traces", True), - upload_logs=getattr(args, "mlflow_upload_logs", True), - ) - mlflow_writer.end_run() + _finalize_mlflow_run(args, mlflow_writer) ft_integration.shutdown() sys.exit(exit_code) From f5c0188eea2be16daf885425a79534daff0352b7 Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 3 Feb 2026 11:40:09 +0000 Subject: [PATCH 15/24] Fix copyright header in mlflow_setup.py (new AMD file) --- primus/backends/megatron/training/mlflow_setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/primus/backends/megatron/training/mlflow_setup.py b/primus/backends/megatron/training/mlflow_setup.py index 119f394e9..e68e04835 100644 --- a/primus/backends/megatron/training/mlflow_setup.py +++ b/primus/backends/megatron/training/mlflow_setup.py @@ -1,6 +1,5 @@ ############################################################################### -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. -# Modification Copyright© 2025 Advanced Micro Devices, Inc. All rights reserved. +# Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. # # See LICENSE for license information. ############################################################################### From 548a66caf4d36259b786218343c348a94393cdf8 Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 3 Feb 2026 11:47:43 +0000 Subject: [PATCH 16/24] Address Copilot review comments for mlflow_artifacts.py - Add List to imports and improve type hints: - _get_all_trace_files(tensorboard_dir: Optional[str]) -> List[str] - _get_all_log_files(exp_root_path: Optional[str]) -> List[str] - Handle FileNotFoundError when calculating file sizes (race condition) - Fix docstring: MLflow runs on last rank only, not rank 0 --- .../megatron/training/mlflow_artifacts.py | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index b410c53a6..8c8f5e1fc 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -24,7 +24,7 @@ import glob import os -from typing import Optional +from typing import List, Optional from primus.modules.module_utils import log_rank_last @@ -38,7 +38,7 @@ def _log_warning(msg: str) -> None: log_rank_last(f"[WARNING] {msg}") -def _get_all_trace_files(tensorboard_dir: str) -> list: +def _get_all_trace_files(tensorboard_dir: Optional[str]) -> List[str]: """ Find all profiler trace files in the tensorboard directory. @@ -75,7 +75,7 @@ def _get_all_trace_files(tensorboard_dir: str) -> list: return unique_files -def _get_all_log_files(exp_root_path: str) -> list: +def _get_all_log_files(exp_root_path: Optional[str]) -> List[str]: """ Find all log files in the experiment logs directory. @@ -112,8 +112,8 @@ def upload_trace_files_to_mlflow( Upload all profiler trace files to MLflow as artifacts. This function collects trace files from the tensorboard directory and - uploads them to MLflow. In distributed settings, only rank 0 (or the - last rank where MLflow writer is initialized) should call this. + uploads them to MLflow. In distributed settings, only the last rank + (world_size - 1) where MLflow writer is initialized should call this. Args: mlflow_writer: The MLflow module instance (from get_mlflow_writer()) @@ -141,7 +141,14 @@ def upload_trace_files_to_mlflow( # Warn about potentially long upload times for large uploads if total_files > 10: - total_size_mb = sum(os.path.getsize(f) for f in trace_files) / (1024 * 1024) + # Safely calculate total size (files may be deleted between discovery and size check) + total_size_bytes = 0 + for f in trace_files: + try: + total_size_bytes += os.path.getsize(f) + except OSError: + pass # File may have been deleted + total_size_mb = total_size_bytes / (1024 * 1024) _log_warning( f"[MLflow] Uploading {total_files} trace files ({total_size_mb:.1f} MB total). " "This may take a while..." @@ -206,7 +213,14 @@ def upload_log_files_to_mlflow( # Warn about potentially long upload times for large uploads if total_files > 20: - total_size_mb = sum(os.path.getsize(f) for f in log_files) / (1024 * 1024) + # Safely calculate total size (files may be deleted between discovery and size check) + total_size_bytes = 0 + for f in log_files: + try: + total_size_bytes += os.path.getsize(f) + except OSError: + pass # File may have been deleted + total_size_mb = total_size_bytes / (1024 * 1024) _log_warning( f"[MLflow] Uploading {total_files} log files ({total_size_mb:.1f} MB total). " "This may take a while..." From d7edba8c687e4560e7f7dad33d99413c89f506fd Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 5 Feb 2026 13:43:36 +0000 Subject: [PATCH 17/24] feat: auto-enable mlflow and profiling for upload flags When mlflow_upload_traces or mlflow_upload_logs is True: - Auto-enable mlflow (set disable_mlflow=False) - Auto-enable profiling if trace upload is requested This removes the need to explicitly set: - --disable_mlflow=False - --profile=True - --use_pytorch_profiler=True --- primus/modules/trainer/megatron/trainer.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 982cd25c3..3bfb1fefc 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -462,6 +462,25 @@ def update_primus_config( log_kv_rank_0(f" -wandb_save_dir", f"{args.wandb_save_dir}") log_kv_rank_0(f" -wandb_entity", f"{args.wandb_entity}") + # mlflow - auto-enable dependencies + # If mlflow_upload_traces or mlflow_upload_logs is True, auto-enable mlflow + mlflow_upload_flags = [ + getattr(args, "mlflow_upload_traces", False), + getattr(args, "mlflow_upload_logs", False), + ] + if any(mlflow_upload_flags) and args.disable_mlflow: + args.disable_mlflow = False + debug_rank_0("Auto-enabled MLflow (disable_mlflow=False) because mlflow_upload_* flags are set") + + # If uploading traces, auto-enable profiling + if getattr(args, "mlflow_upload_traces", False): + if not getattr(args, "profile", False): + args.profile = True + debug_rank_0("Auto-enabled profile=True for mlflow trace upload") + if not getattr(args, "use_pytorch_profiler", False): + args.use_pytorch_profiler = True + debug_rank_0("Auto-enabled use_pytorch_profiler=True for mlflow trace upload") + # mlflow log_kv_rank_0(f"-disable_mlflow", f"{args.disable_mlflow}") if not args.disable_mlflow: From c03fba59569ff5e340e59718218ff1a8054db228 Mon Sep 17 00:00:00 2001 From: guangphu Date: Thu, 5 Feb 2026 14:16:35 +0000 Subject: [PATCH 18/24] fix: auto-enable tensorboard when profiling is enabled The profiler saves traces to tensorboard_dir, which is None when tensorboard is disabled. This caused a TypeError during trace save. Moved auto-enable logic before tensorboard section and added tensorboard auto-enable when mlflow_upload_traces is True. --- primus/modules/trainer/megatron/trainer.py | 41 ++++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 3bfb1fefc..ddff44744 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -425,6 +425,28 @@ def update_primus_config( else: log_rank_0(f"-{latest_file} does not exist, skip auto_continue_train.") + # Auto-enable dependencies for mlflow upload flags + # This must run BEFORE tensorboard section to ensure paths are set correctly + mlflow_upload_flags = [ + getattr(args, "mlflow_upload_traces", False), + getattr(args, "mlflow_upload_logs", False), + ] + if any(mlflow_upload_flags) and args.disable_mlflow: + args.disable_mlflow = False + debug_rank_0("Auto-enabled MLflow (disable_mlflow=False) because mlflow_upload_* flags are set") + + # If uploading traces, auto-enable profiling and tensorboard + if getattr(args, "mlflow_upload_traces", False): + if not getattr(args, "profile", False): + args.profile = True + debug_rank_0("Auto-enabled profile=True for mlflow trace upload") + if not getattr(args, "use_pytorch_profiler", False): + args.use_pytorch_profiler = True + debug_rank_0("Auto-enabled use_pytorch_profiler=True for mlflow trace upload") + if getattr(args, "disable_tensorboard", True): + args.disable_tensorboard = False + debug_rank_0("Auto-enabled tensorboard (disable_tensorboard=False) for profiler trace output") + # tensorboard if not args.disable_tensorboard: tb_path = os.path.abspath(os.path.join(exp_root_path, "tensorboard")) @@ -462,25 +484,6 @@ def update_primus_config( log_kv_rank_0(f" -wandb_save_dir", f"{args.wandb_save_dir}") log_kv_rank_0(f" -wandb_entity", f"{args.wandb_entity}") - # mlflow - auto-enable dependencies - # If mlflow_upload_traces or mlflow_upload_logs is True, auto-enable mlflow - mlflow_upload_flags = [ - getattr(args, "mlflow_upload_traces", False), - getattr(args, "mlflow_upload_logs", False), - ] - if any(mlflow_upload_flags) and args.disable_mlflow: - args.disable_mlflow = False - debug_rank_0("Auto-enabled MLflow (disable_mlflow=False) because mlflow_upload_* flags are set") - - # If uploading traces, auto-enable profiling - if getattr(args, "mlflow_upload_traces", False): - if not getattr(args, "profile", False): - args.profile = True - debug_rank_0("Auto-enabled profile=True for mlflow trace upload") - if not getattr(args, "use_pytorch_profiler", False): - args.use_pytorch_profiler = True - debug_rank_0("Auto-enabled use_pytorch_profiler=True for mlflow trace upload") - # mlflow log_kv_rank_0(f"-disable_mlflow", f"{args.disable_mlflow}") if not args.disable_mlflow: From 7a87c510b654855904147f650f96fedeb82f38ab Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 9 Feb 2026 12:25:56 +0000 Subject: [PATCH 19/24] Keep MLflow opt-in: do not override disable_mlflow from upload flags Co-authored-by: Cursor --- primus/modules/trainer/megatron/trainer.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 8d4391919..50906ca03 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -425,18 +425,14 @@ def update_primus_config( else: log_rank_0(f"-{latest_file} does not exist, skip auto_continue_train.") - # Auto-enable dependencies for mlflow upload flags - # This must run BEFORE tensorboard section to ensure paths are set correctly - mlflow_upload_flags = [ - getattr(args, "mlflow_upload_traces", False), - getattr(args, "mlflow_upload_logs", False), - ] - if any(mlflow_upload_flags) and args.disable_mlflow: - args.disable_mlflow = False - debug_rank_0("Auto-enabled MLflow (disable_mlflow=False) because mlflow_upload_* flags are set") - - # If uploading traces, auto-enable profiling and tensorboard - if getattr(args, "mlflow_upload_traces", False): + # If uploading traces (or tracelens) to MLflow, auto-enable profiling and tensorboard. + # Only when MLflow is enabled (disable_mlflow=False); we do not override disable_mlflow + # so MLflow remains opt-in and users with disable_mlflow: true are not surprised. + needs_profiling = ( + getattr(args, "mlflow_upload_traces", False) + or getattr(args, "mlflow_upload_tracelens_report", False) + ) and not args.disable_mlflow + if needs_profiling: if not getattr(args, "profile", False): args.profile = True debug_rank_0("Auto-enabled profile=True for mlflow trace upload") From 1e0f3b8fcac40d084f578925599e88ebb59f9161 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 9 Feb 2026 14:47:23 +0000 Subject: [PATCH 20/24] Fix barrier deadlock: run barrier before mlflow_writer check so all ranks participate Co-authored-by: Cursor --- primus/modules/trainer/megatron/trainer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 50906ca03..ccf88a1fa 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -189,13 +189,14 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: args: Megatron arguments containing mlflow_upload_traces/logs settings mlflow_writer: The MLflow writer instance (or None if not enabled) """ - if mlflow_writer is None: - return - - # Barrier to ensure all ranks have finished writing files before upload + # Barrier to ensure all ranks have finished writing files before upload. + # Must run on ALL ranks to avoid deadlock (only last rank has mlflow_writer). if dist.is_initialized(): dist.barrier() + if mlflow_writer is None: + return + # Upload artifacts before ending the run upload_mlflow_artifacts( upload_traces=getattr(args, "mlflow_upload_traces", True), From f1fa6a18b7f2b0784befee50168a0434d9b19db8 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 9 Feb 2026 14:50:04 +0000 Subject: [PATCH 21/24] Guard NNODES/SLURM_NNODES parse: catch ValueError and default to 1 node Co-authored-by: Cursor --- primus/backends/megatron/training/mlflow_artifacts.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index 8c8f5e1fc..a4b711d1a 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -285,7 +285,11 @@ def upload_artifacts_to_mlflow( return {"traces": 0, "logs": 0} # Warn about multi-node shared storage requirement - nnodes = int(os.environ.get("NNODES", os.environ.get("SLURM_NNODES", "1"))) + try: + nnodes = int(os.environ.get("NNODES", os.environ.get("SLURM_NNODES", "1"))) + except ValueError: + nnodes = 1 + _log_warning("[MLflow] NNODES/SLURM_NNODES could not be parsed as integer; assuming 1 node.") if nnodes > 1: _log_warning( f"[MLflow] Multi-node training detected ({nnodes} nodes). " From 961246bb66820656d11823edf3430e9a4b8747a3 Mon Sep 17 00:00:00 2001 From: guangphu Date: Mon, 9 Feb 2026 14:52:00 +0000 Subject: [PATCH 22/24] Reset exp_root_path on MLflow finalization to avoid stale global in same process Co-authored-by: Cursor --- primus/modules/trainer/megatron/trainer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index ccf88a1fa..778e35ac5 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -149,6 +149,7 @@ set_train_start_time, ) from primus.backends.megatron.training.mlflow_setup import ( + reset_exp_root_path, set_exp_root_path, upload_mlflow_artifacts, ) @@ -195,6 +196,7 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: dist.barrier() if mlflow_writer is None: + reset_exp_root_path() return # Upload artifacts before ending the run @@ -203,6 +205,8 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: upload_logs=getattr(args, "mlflow_upload_logs", True), ) mlflow_writer.end_run() + # Reset so subsequent runs in the same process don't use a stale path + reset_exp_root_path() class MegatronTrainer(BaseTrainer, BaseModule): From 1fc65ffd861421230d7ed433d148cf718cda1629 Mon Sep 17 00:00:00 2001 From: guangphu Date: Tue, 10 Feb 2026 10:26:14 +0000 Subject: [PATCH 23/24] Improve MLflow artifact upload robustness Scope MLflow artifact imports to call sites, add exception detail and tracebacks, and avoid forcing default upload flags when args omit them. Co-authored-by: Cursor --- .../megatron/training/mlflow_artifacts.py | 17 +++++++++++++---- .../backends/megatron/training/mlflow_setup.py | 3 ++- primus/modules/trainer/megatron/trainer.py | 10 ++++++---- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/primus/backends/megatron/training/mlflow_artifacts.py b/primus/backends/megatron/training/mlflow_artifacts.py index a4b711d1a..0837eccc3 100644 --- a/primus/backends/megatron/training/mlflow_artifacts.py +++ b/primus/backends/megatron/training/mlflow_artifacts.py @@ -24,6 +24,7 @@ import glob import os +import traceback from typing import List, Optional from primus.modules.module_utils import log_rank_last @@ -31,6 +32,12 @@ # Note: This module is called on the last rank (where MLflow is initialized). # Using log_rank_last ensures messages are visible. For warnings, we prefix # with [WARNING] since warning_rank_last doesn't exist. +try: + from mlflow.exceptions import MlflowException +except ModuleNotFoundError: + + class MlflowException(Exception): + """Fallback exception when mlflow isn't available.""" def _log_warning(msg: str) -> None: @@ -173,8 +180,9 @@ def upload_trace_files_to_mlflow( f"[MLflow] Uploaded trace file ({uploaded_count}/{total_files}): " f"{os.path.basename(trace_file)}" ) - except Exception as e: - _log_warning(f"[MLflow] Failed to upload trace file {trace_file}: {e}") + except (OSError, RuntimeError, ValueError, MlflowException) as e: + _log_warning(f"[MLflow] Failed to upload trace file {trace_file}: {type(e).__name__}: {e}") + _log_warning(traceback.format_exc().strip()) log_rank_last(f"[MLflow] Uploaded {uploaded_count}/{total_files} trace files to '{artifact_path}'") return uploaded_count @@ -241,8 +249,9 @@ def upload_log_files_to_mlflow( mlflow_writer.log_artifact(log_file, artifact_path=artifact_subpath) uploaded_count += 1 - except Exception as e: - _log_warning(f"[MLflow] Failed to upload log file {log_file}: {e}") + except (OSError, RuntimeError, ValueError, MlflowException) as e: + _log_warning(f"[MLflow] Failed to upload log file {log_file}: {type(e).__name__}: {e}") + _log_warning(traceback.format_exc().strip()) log_rank_last(f"[MLflow] Uploaded {uploaded_count}/{total_files} log files to '{artifact_path}'") return uploaded_count diff --git a/primus/backends/megatron/training/mlflow_setup.py b/primus/backends/megatron/training/mlflow_setup.py index e68e04835..33a15ae10 100644 --- a/primus/backends/megatron/training/mlflow_setup.py +++ b/primus/backends/megatron/training/mlflow_setup.py @@ -11,7 +11,6 @@ """ from .global_vars import get_args, get_mlflow_writer -from .mlflow_artifacts import upload_artifacts_to_mlflow _GLOBAL_EXP_ROOT_PATH = None @@ -55,6 +54,8 @@ def upload_mlflow_artifacts( if mlflow_writer is None: return None + from .mlflow_artifacts import upload_artifacts_to_mlflow + args = get_args() exp_root_path = get_exp_root_path() tensorboard_dir = getattr(args, "tensorboard_dir", None) diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index 778e35ac5..fa0408c4f 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -200,10 +200,12 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: return # Upload artifacts before ending the run - upload_mlflow_artifacts( - upload_traces=getattr(args, "mlflow_upload_traces", True), - upload_logs=getattr(args, "mlflow_upload_logs", True), - ) + mlflow_artifact_kwargs = {} + if hasattr(args, "mlflow_upload_traces"): + mlflow_artifact_kwargs["upload_traces"] = args.mlflow_upload_traces + if hasattr(args, "mlflow_upload_logs"): + mlflow_artifact_kwargs["upload_logs"] = args.mlflow_upload_logs + upload_mlflow_artifacts(**mlflow_artifact_kwargs) mlflow_writer.end_run() # Reset so subsequent runs in the same process don't use a stale path reset_exp_root_path() From 1b36c3d05a3761460502c213953d07e4c39c4f6d Mon Sep 17 00:00:00 2001 From: guangphu Date: Wed, 11 Feb 2026 09:40:13 +0000 Subject: [PATCH 24/24] Fix Copilot review notes for PR 440 Align Slurm EXP defaults with local training, ensure finalize barriers wait for uploads, and update tests to match handled exceptions. Co-authored-by: Cursor --- examples/run_slurm_pretrain.sh | 5 +++++ primus/modules/trainer/megatron/trainer.py | 4 ++++ tests/unit_tests/backends/megatron/test_mlflow_artifacts.py | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/examples/run_slurm_pretrain.sh b/examples/run_slurm_pretrain.sh index 04da35a4d..e17f42faa 100755 --- a/examples/run_slurm_pretrain.sh +++ b/examples/run_slurm_pretrain.sh @@ -34,6 +34,11 @@ export NNODES=${NNODES:-1} SCRIPT_DIR=$(dirname "$(realpath "${BASH_SOURCE[0]}")") +# Align EXP default with run_local_pretrain.sh to avoid unknown names +if [[ -z "${EXP:-}" ]]; then + export EXP="${SCRIPT_DIR}/megatron/exp_pretrain.yaml" +fi + export LOG_DIR=${LOG_DIR:-"./output"} LOG_FILE="${LOG_DIR}/log_slurm_pretrain.txt" mkdir -p "$LOG_DIR" diff --git a/primus/modules/trainer/megatron/trainer.py b/primus/modules/trainer/megatron/trainer.py index fa0408c4f..4b41844b7 100644 --- a/primus/modules/trainer/megatron/trainer.py +++ b/primus/modules/trainer/megatron/trainer.py @@ -197,6 +197,8 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: if mlflow_writer is None: reset_exp_root_path() + if dist.is_initialized(): + dist.barrier() return # Upload artifacts before ending the run @@ -209,6 +211,8 @@ def _finalize_mlflow_run(args, mlflow_writer) -> None: mlflow_writer.end_run() # Reset so subsequent runs in the same process don't use a stale path reset_exp_root_path() + if dist.is_initialized(): + dist.barrier() class MegatronTrainer(BaseTrainer, BaseModule): diff --git a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py index 14f92e936..60eda4703 100644 --- a/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py +++ b/tests/unit_tests/backends/megatron/test_mlflow_artifacts.py @@ -174,7 +174,7 @@ def test_handles_upload_error(self, mock_warning, mock_log, tmp_path): trace_file = tmp_path / "rank_0.pt.trace.json" trace_file.touch() mlflow_mock = MagicMock() - mlflow_mock.log_artifact.side_effect = Exception("Upload failed") + mlflow_mock.log_artifact.side_effect = RuntimeError("Upload failed") count = upload_trace_files_to_mlflow(mlflow_mock, str(tmp_path))