diff --git a/scripts/performance/setup_experiment.py b/scripts/performance/setup_experiment.py index 2ee374e7de..6afd3bcd50 100755 --- a/scripts/performance/setup_experiment.py +++ b/scripts/performance/setup_experiment.py @@ -59,6 +59,7 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) # pin level so nemo_run's WARNING root doesn't suppress INFO def check_training_finished(log_file_paths: List[str]) -> bool: @@ -478,10 +479,23 @@ def main( ) logger.info(f"Starting convergence check for {model_family_name}_{model_recipe_name}") + wandb_run = None + # Bug 2 fix: wandb online mode redirects fd 2 at the OS level via os.dup2(), + # making all stderr writes invisible. Grab a private copy of fd 2 *before* + # wandb.init() so our StreamHandler bypasses wandb's capture pipe. + _dup_file = os.fdopen(os.dup(2), "w", buffering=1) + _dup_handler = logging.StreamHandler(_dup_file) + _dup_handler.setLevel(logging.DEBUG) + _dup_handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) + logger.addHandler(_dup_handler) + if HAVE_WANDB and wandb_key: wandb_run = wandb.init( - project=wandb_project_name, entity=wandb_entity_name, id=wandb_run_id, resume="allow" + project=wandb_project_name, + entity=wandb_entity_name, + id=wandb_run_id, + resume="allow", ) logger.info("Waiting 10 seconds for I/O to settle") @@ -501,12 +515,16 @@ def main( performance_config=performance_params, memory_config=memory_params, wandb_run=wandb_run, + _logger=logger, ) if wandb_run: wandb_run.finish() wandb.teardown(exit_code=int(not is_testing_passed)) + logger.removeHandler(_dup_handler) + _dup_file.close() + if not is_long_convergence_run: n_attempts = max_retries + 1 is_finished_experiment = True diff --git a/scripts/performance/utils/evaluate.py b/scripts/performance/utils/evaluate.py index fcd9c0bc7c..89f83c4453 100644 --- a/scripts/performance/utils/evaluate.py +++ b/scripts/performance/utils/evaluate.py @@ -289,85 +289,116 @@ def validate_convergence( def validate_performance( - current_values: "np.ndarray", - golden_values: "np.ndarray", + current_gpu_util_values: "np.ndarray", + golden_gpu_util_values: "np.ndarray", steps: List[str], logger: logging.Logger, wandb_run: Optional["wandb.Run"] = None, config: Dict[str, Any] = None, ) -> Dict[str, Any]: """ - Validate performance metrics. + Validate GPU utilization performance metrics. + + Uses signed difference to detect both regressions (GPU util dropped) and + unexpected improvements (GPU util jumped suspiciously high). + + Args: + current_gpu_util_values: Current GPU utilization values per step + golden_gpu_util_values: Golden reference GPU utilization values per step + steps: Training step identifiers + logger: Logger instance for detailed reporting + wandb_run: Optional wandb run object + config: Optional configuration dict with custom thresholds + + Returns: + Dict with 'passed' boolean and detailed results """ + RED, GREEN, RESET = "\033[31m", "\033[32m", "\033[0m" default_config = { - # Statistical significance threshold - "correlation_threshold": 0.95, - # Point-wise tolerances (adaptive based on loss magnitude) - "high_loss_tolerance": 0.10, # 10% for loss > 2.0 - "medium_loss_tolerance": 0.05, # 5% for loss 0.5-2.0 - "low_loss_tolerance": 0.02, # 2% for loss < 0.5 - # Curve shape metrics - "final_loss_tolerance": 0.03, # 3% for final loss - # Outlier handling - "max_outlier_ratio": 0.1, # Max 10% of points can be outliers - "outlier_threshold": 3.0, # 3-sigma outlier detection - # Loss curve analysis - "skip_first_percent_loss": 0.0, # Percentage of loss points to skip from beginning - # Performance timing - "skip_first_percent_time": 0.1, # Percentage of iterations to skip from beginning for timing - "timing_threshold": 0.05, # 5% threshold for timing validation + "skip_first_percent_time": 0.1, # Percentage of iterations to skip from beginning + "timing_threshold": 0.05, # 5% threshold for GPU util validation } if config: default_config.update(config) config = default_config - # Discard first N% of iterations for stable timing comparison - skip_first_n_percent = max(1, int(len(steps) * config["skip_first_percent_time"])) - current_timing_stable = current_values[skip_first_n_percent:] - golden_timing_stable = golden_values[skip_first_n_percent:] + # Discard first N% of iterations for stable comparison + skip = max(1, int(len(steps) * config["skip_first_percent_time"])) + current_stable = current_gpu_util_values[skip:] + golden_stable = golden_gpu_util_values[skip:] + + current_avg = float(np.nanmean(current_stable)) + golden_avg = float(np.nanmean(golden_stable)) - # Calculate average step timing - current_avg_timing = np.mean(current_timing_stable) - golden_avg_timing = np.mean(golden_timing_stable) + # Signed diff: positive = improvement (higher util), negative = regression (lower util) + signed_diff = (current_avg - golden_avg) / golden_avg if golden_avg != 0 else 0.0 - # Calculate timing difference - timing_diff = abs(current_avg_timing - golden_avg_timing) / golden_avg_timing + is_regression = signed_diff < -config["timing_threshold"] + is_improvement = signed_diff > config["timing_threshold"] logger.info( - f"Step timing comparison (excluding first {config['skip_first_percent_time'] * 100:.1f}% of iterations):" + f"GPU utilization comparison (excluding first {config['skip_first_percent_time'] * 100:.1f}% of iterations):" ) - logger.info(f" Current average timing: {current_avg_timing:.4f}s") - logger.info(f" Golden average timing: {golden_avg_timing:.4f}s") - logger.info(f" Timing difference: {timing_diff:.4f} ({timing_diff * 100:.2f}%)") - logger.info(f" Threshold: {config['timing_threshold'] * 100:.1f}%") + logger.info(f" Current average GPU util: {current_avg:.4f}%") + logger.info(f" Golden average GPU util: {golden_avg:.4f}%") + logger.info(f" Signed diff: {signed_diff * 100:.2f}%") + logger.info(f" Threshold: ±{config['timing_threshold'] * 100:.1f}%") results = {"passed": True, "failed_metrics": [], "summary": "", "details": "", "metrics": {}} - if timing_diff > config["timing_threshold"]: + results["metrics"]["current_avg_gpu_util"] = current_avg + results["metrics"]["golden_avg_gpu_util"] = golden_avg + results["metrics"]["signed_diff"] = signed_diff + results["metrics"]["threshold"] = config["timing_threshold"] + + if is_regression: logger.warning( - f"Step timing validation FAILED: {timing_diff * 100:.2f}% > {config['timing_threshold'] * 100:.1f}%" + f"{RED}REGRESSION{RESET}: GPU util dropped {abs(signed_diff) * 100:.2f}% " + f"(current={current_avg:.4f}%, golden={golden_avg:.4f}%, " + f"threshold={config['timing_threshold'] * 100:.1f}%)" ) - # Add timing failure to convergence result results["passed"] = False - results["failed_metrics"].append("step_timing") - results["summary"] = f"Failed {len(results['failed_metrics'])} out of 1 tests" - results["timing_diff"] = timing_diff - results["timing_threshold"] = config["timing_threshold"] + results["failed_metrics"].append("gpu_util_regression") + results["metrics"]["direction"] = "regression" + elif is_improvement: + logger.warning( + f"{GREEN}UNEXPECTED IMPROVEMENT{RESET}: GPU util jumped {signed_diff * 100:.2f}% " + f"(current={current_avg:.4f}%, golden={golden_avg:.4f}%, " + f"threshold={config['timing_threshold'] * 100:.1f}%)" + ) + results["passed"] = False + results["failed_metrics"].append("gpu_util_improvement") + results["metrics"]["direction"] = "improvement" else: - results["passed"] = True logger.info( - f"✓ Step timing validation passed: {timing_diff * 100:.2f}% <= {config['timing_threshold'] * 100:.1f}%" + f"✓ GPU utilization passed: {signed_diff * 100:.2f}% within " + f"±{config['timing_threshold'] * 100:.1f}% threshold" ) + results["metrics"]["direction"] = "pass" + + if results["passed"]: + results["summary"] = "All performance tests passed" + logger.info("🎉 All performance tests PASSED!") + else: + direction = results["metrics"]["direction"] + results["summary"] = f"Failed 1 out of 1 test ({direction})" + logger.error(f"❌ Performance validation FAILED: {results['summary']}") if wandb_run is not None: - wandb_run.summary["current_avg_timing"] = current_avg_timing - wandb_run.summary["golden_avg_timing"] = golden_avg_timing - wandb_run.summary["timing_diff"] = timing_diff - wandb_run.summary["timing_threshold"] = config["timing_threshold"] + wandb_run.summary["current_avg_gpu_util"] = current_avg + wandb_run.summary["golden_avg_gpu_util"] = golden_avg + wandb_run.summary["gpu_util_signed_diff"] = signed_diff + wandb_run.summary["gpu_util_threshold"] = config["timing_threshold"] wandb_run.summary["performance_passed"] = results["passed"] + for key, value in results["metrics"].items(): + if isinstance(value, float): + logger.info(f" {key}: {value:.6f}") + else: + logger.info(f" {key}: {value}") + return results @@ -381,7 +412,7 @@ def validate_memory( config: Dict[str, Any] = None, ) -> Dict[str, Any]: """ - Validate performance metrics. + Validate memory metrics. """ default_config = { @@ -392,50 +423,48 @@ def validate_memory( default_config.update(config) config = default_config - # Calculate memory difference + # Calculate memory differences max_alloc_diff = ( abs(current_max_alloc - golden_max_alloc) / golden_max_alloc if golden_max_alloc != 0 else abs(current_max_alloc) ) + alloc_diff = abs(current_alloc - golden_alloc) / golden_alloc if golden_alloc != 0 else abs(current_alloc) logger.info(f"Max alloc difference: {max_alloc_diff * 100:.2f}%") logger.info(f"Memory threshold: {config['memory_threshold'] * 100:.1f}%") logger.info(f"Current max alloc: {current_max_alloc}") logger.info(f"Golden max alloc: {golden_max_alloc}") + logger.info(f"Alloc difference: {alloc_diff * 100:.2f}%") + logger.info(f"Current alloc: {current_alloc}") + logger.info(f"Golden alloc: {golden_alloc}") results = {"passed": True, "failed_metrics": [], "summary": "", "details": "", "metrics": {}} + results["metrics"]["current_max_alloc"] = current_max_alloc + results["metrics"]["golden_max_alloc"] = golden_max_alloc + results["metrics"]["max_alloc_diff"] = max_alloc_diff + results["metrics"]["current_alloc"] = current_alloc + results["metrics"]["golden_alloc"] = golden_alloc + results["metrics"]["alloc_diff"] = alloc_diff + results["metrics"]["threshold"] = config["memory_threshold"] + if max_alloc_diff > config["memory_threshold"]: logger.warning( f"Memory validation FAILED: {max_alloc_diff * 100:.2f}% > {config['memory_threshold'] * 100:.1f}%" ) - # Add timing failure to convergence result + # Add to memory result results["passed"] = False results["failed_metrics"].append("max_alloc") - results["summary"] = f"Failed {len(results['failed_metrics'])} out of 2 tests" - results["max_alloc_diff"] = max_alloc_diff - results["memory_threshold"] = config["memory_threshold"] else: - results["passed"] = True logger.info( f"✓ Max Memory allocation passed: {max_alloc_diff * 100:.2f}% <= {config['memory_threshold'] * 100:.1f}%" ) - alloc_diff = abs(current_alloc - golden_alloc) / golden_alloc if golden_alloc != 0 else abs(current_alloc) - - logger.info(f"Alloc difference: {alloc_diff * 100:.2f}%") - logger.info(f"Memory threshold: {config['memory_threshold'] * 100:.1f}%") - logger.info(f"Current alloc: {current_alloc}") - logger.info(f"Golden alloc: {golden_alloc}") - if alloc_diff > config["memory_threshold"]: logger.warning(f"Alloc validation FAILED: {alloc_diff * 100:.2f}% > {config['memory_threshold'] * 100:.1f}%") results["passed"] = False results["failed_metrics"].append("alloc") - results["summary"] = f"Failed {len(results['failed_metrics'])} out of 2 tests" - results["alloc_diff"] = alloc_diff - results["memory_threshold"] = config["memory_threshold"] else: logger.info(f"✓ Alloc validation passed: {alloc_diff * 100:.2f}% <= {config['memory_threshold'] * 100:.1f}%") @@ -445,7 +474,7 @@ def validate_memory( logger.info("🎉 All memory validation tests PASSED!") else: results["summary"] = f"Failed {len(results['failed_metrics'])} out of 2 validation tests" - logger.error(f"❌ Convergence validation FAILED: {results['summary']}") + logger.error(f"❌ Memory validation FAILED: {results['summary']}") if wandb_run is not None: wandb_run.summary["memory_passed"] = results["passed"] @@ -493,6 +522,7 @@ def calc_convergence_and_performance( performance_config: Dict[str, Any], memory_config: Dict[str, Any], wandb_run: Optional["wandb.Run"] = None, + _logger: logging.Logger = None, ): """ Calculate convergence metrics and validate against golden values. @@ -512,7 +542,9 @@ def calc_convergence_and_performance( low_loss_tolerance, final_loss_tolerance, max_outlier_ratio, outlier_threshold, skip_first_percent_loss wandb_run: An optional wandb run object to log metrics to + _logger: Logger to use; defaults to this module's logger if not provided """ + _logger = _logger or logger if not HAVE_WANDB: raise ImportError("wandb is required for calculating perf and convergence metrics") @@ -530,7 +562,7 @@ def calc_convergence_and_performance( golden_values_file_name = pathlib.Path(golden_values_path).name next_golden_values_path = os.path.join(assets_dir, "golden_values", golden_values_file_name) expected_golden_values_path = os.path.join(pathlib.Path(golden_values_path).parent, golden_values_file_name) - logger.info(f"Golden values path: {expected_golden_values_path}") + _logger.info(f"Golden values path: {expected_golden_values_path}") # Always write actuals into experiment directory write_golden_values_to_disk( @@ -573,16 +605,17 @@ def calc_convergence_and_performance( f"You will need to add the golden values ({expected_golden_values_path}) " "into the repository before the next run." ) - logger.error(error_msg) + _logger.error(error_msg) sys.exit(1) - logger.info("Found existing golden values file, performing convergence check") + _logger.info("Found existing golden values file, performing convergence check") with open(expected_golden_values_path, "r") as f: expected_golden_values = json.load(f) steps = [] golden_train_loss = {} golden_iter_time = {} + golden_gpu_util = {} golden_alloc = None golden_max_alloc = None for key, value in expected_golden_values.items(): @@ -595,21 +628,22 @@ def calc_convergence_and_performance( steps.append(key) golden_train_loss[key] = value[loss_metric] golden_iter_time[key] = value[timing_metric] + golden_gpu_util[key] = value.get("GPU utilization") # Extract golden_lm_loss and golden_iter_time lists - logger.info(f"Comparing {len(steps)} training steps for convergence") + _logger.info(f"Comparing {len(steps)} training steps for convergence") steps = sorted(golden_train_loss.keys(), key=int) # check for convergence golden_train_loss_values = np.array([golden_train_loss[str(step)] for step in steps]) current_train_loss_values = np.array([current_train_loss.get(s, float("nan")) for s in steps]) - logger.info(f"Current loss values (last 15): {current_train_loss_values[-15:]}") - logger.info(f"Golden loss values (last 15): {golden_train_loss_values[-15:]}") + _logger.info(f"Current loss values (last 15): {current_train_loss_values[-15:]}") + _logger.info(f"Golden loss values (last 15): {golden_train_loss_values[-15:]}") convergence_result = validate_convergence( current_values=current_train_loss_values, golden_values=golden_train_loss_values, steps=steps, - logger=logger, + logger=_logger, config=convergence_config, wandb_run=wandb_run, ) @@ -622,40 +656,57 @@ def calc_convergence_and_performance( # check for performance golden_iter_time_values = np.array([golden_iter_time[str(step)] for step in steps]) current_iter_time_values = np.array([current_iter_time.get(s, float("nan")) for s in steps]) - logger.info(f"Current timing values (last 15): {current_iter_time_values[-15:]}") - logger.info(f"Golden timing values (last 15): {golden_iter_time_values[-15:]}") + # Use explicit None-check: dict.get(key, default) only applies the default when the key is + # absent; if the key exists but its value is None (e.g. "GPU utilization" missing from the + # golden file for that step), .get() returns None — not the default — creating an object + # array that breaks np.nanmean. + golden_gpu_util_values = np.array( + [float(v) if (v := golden_gpu_util.get(s)) is not None else float("nan") for s in steps] + ) + current_gpu_util_values = np.array( + [float(v) if (v := current_gpu_util.get(s)) is not None else float("nan") for s in steps] + ) + _logger.info(f"Current GPU util values (last 15): {current_gpu_util_values[-15:]}") + _logger.info(f"Golden GPU util values (last 15): {golden_gpu_util_values[-15:]}") performance_result = validate_performance( - current_values=current_iter_time_values, - golden_values=golden_iter_time_values, + current_gpu_util_values=current_gpu_util_values, + golden_gpu_util_values=golden_gpu_util_values, steps=steps, - logger=logger, + logger=_logger, config=performance_config, wandb_run=wandb_run, ) + # Add iter-time averages for debugging (not used for pass/fail) + skip = max(1, int(len(steps) * performance_config.get("skip_first_percent_time", 0.1))) + performance_result["metrics"]["current_avg_iter_time_ms"] = float(np.nanmean(current_iter_time_values[skip:])) + performance_result["metrics"]["golden_avg_iter_time_ms"] = float(np.nanmean(golden_iter_time_values[skip:])) if not performance_result["passed"]: + direction = performance_result["metrics"]["direction"] + signed_diff = performance_result["metrics"]["signed_diff"] error_msg += f"Performance check failed. {performance_result['summary']}\n" - error_msg += f"Timing difference is greater than threshold: {performance_result['timing_diff'] * 100:.2f}% > {performance_config['timing_threshold'] * 100:.1f}%\n" + error_msg += ( + f"GPU util {direction}: signed diff {signed_diff * 100:.2f}% > " + f"±{performance_config.get('timing_threshold', 0.05) * 100:.1f}%\n" + ) # check for memory memory_metrics_missing = golden_alloc is None or golden_max_alloc is None if memory_metrics_missing: - logger.warning("Memory metrics (alloc, max_alloc) not found in golden values - skipping memory validation") + _logger.warning("Memory metrics (alloc, max_alloc) not found in golden values - skipping memory validation") else: memory_result = validate_memory( golden_alloc=golden_alloc, current_alloc=current_alloc, golden_max_alloc=golden_max_alloc, current_max_alloc=current_max_alloc, - logger=logger, + logger=_logger, wandb_run=wandb_run, config=memory_config, ) if not memory_result["passed"]: error_msg += f"Memory check failed. {memory_result['summary']}\n" - if "max_alloc_diff" in memory_result: - error_msg += f"Max alloc difference: {memory_result['max_alloc_diff'] * 100:.2f}%\n" - if "alloc_diff" in memory_result: - error_msg += f"Alloc difference: {memory_result['alloc_diff'] * 100:.2f}%\n" + error_msg += f"Max alloc difference: {memory_result['metrics']['max_alloc_diff'] * 100:.2f}%\n" + error_msg += f"Alloc difference: {memory_result['metrics']['alloc_diff'] * 100:.2f}%\n" error_msg += f"Threshold: {memory_config['memory_threshold'] * 100:.1f}%\n" if wandb_run is not None: @@ -668,6 +719,8 @@ def calc_convergence_and_performance( "compare/current_iter_time": current_iter_time_values[i], "compare/golden_lm_loss": golden_train_loss_values[i], "compare/golden_iter_time": golden_iter_time_values[i], + "compare/current_gpu_util": current_gpu_util_values[i], + "compare/golden_gpu_util": golden_gpu_util_values[i], "compare/current_grad_norm": current_grad_norm.get(steps[i], float("nan")), } ) @@ -696,5 +749,5 @@ def calc_convergence_and_performance( error_msg += f' "{alloc_metric}": {current_alloc},\n' error_msg += f' "{max_alloc_metric}": {current_max_alloc}\n' - logger.info(f"Convergence check completed successfully for {model_family_name}_{model_recipe_name}") + _logger.info(f"Convergence check completed successfully for {model_family_name}_{model_recipe_name}") return has_validation_failures is False, error_msg