diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 7080dfef009d..7755f843e063 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -109,7 +109,9 @@ def distribution( @staticmethod def gauge( - namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingGauge': + namespace: Union[Type, str], + name: str, + process_wide: bool = False) -> 'Metrics.DelegatingGauge': """Obtains or creates a Gauge metric. Gauge metrics are restricted to integer-only values. @@ -117,12 +119,15 @@ def gauge( Args: namespace: A class or string that gives the namespace to a metric name: A string that gives a unique name to a metric + process_wide: Whether or not the metric is specific to the current bundle + or should be calculated for the entire process. Returns: A Distribution object. """ namespace = Metrics.get_namespace(namespace) - return Metrics.DelegatingGauge(MetricName(namespace, name)) + return Metrics.DelegatingGauge( + MetricName(namespace, name), process_wide=process_wide) @staticmethod def string_set( @@ -216,9 +221,13 @@ def __init__(self, metric_name: MetricName) -> None: class DelegatingGauge(Gauge): """Metrics Gauge that Delegates functionality to MetricsEnvironment.""" - def __init__(self, metric_name: MetricName) -> None: + def __init__( + self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[method-assign] + self.set = MetricUpdater( # type: ignore[method-assign] + cells.GaugeCell, + metric_name, + process_wide=process_wide) class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" diff --git a/sdks/python/apache_beam/ml/inference/model_manager.py b/sdks/python/apache_beam/ml/inference/model_manager.py index 186611984df0..7d35c3d2c67a 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager.py +++ b/sdks/python/apache_beam/ml/inference/model_manager.py @@ -45,6 +45,7 @@ import torch from scipy.optimize import nnls +from apache_beam.metrics.metric import Metrics from apache_beam.utils import multi_process_shared logger = logging.getLogger(__name__) @@ -205,6 +206,10 @@ def set_initial_estimate(self, model_tag: str, cost: float): with self._lock: self.estimates[model_tag] = cost self.known_models.add(model_tag) + Metrics.gauge( + "BeamML_ModelManager", + f"memory_estimate_mb_{model_tag}", + process_wide=True).set(int(cost)) self.logging_info("Initial Profile for %s: %s MB", model_tag, cost) def add_observation( @@ -291,6 +296,11 @@ def _solve(self): self.logging_info( "Updated Estimate for %s: %.1f MB", model, self.estimates[model]) + + Metrics.gauge( + "BeamML_ModelManager", + f"memory_estimate_mb_{model}", + process_wide=True).set(int(self.estimates[model])) self.logging_info("System Bias: %s MB", bias) except Exception as e: @@ -374,6 +384,18 @@ def __init__( self._monitor.start() + def _update_model_count_metric(self): + for tag, instances in self._models.items(): + Metrics.gauge( + "BeamML_ModelManager", f"num_loaded_models_{tag}", + process_wide=True).set(len(instances)) + + def _clear_all_model_metrics(self): + for tag in self._models: + Metrics.gauge( + "BeamML_ModelManager", f"num_loaded_models_{tag}", + process_wide=True).set(0) + def logging_info(self, message: str, *args): if self._verbose_logging: logger.info(message, *args) @@ -719,6 +741,7 @@ def _perform_eviction(self, key: str, tag: str, instance: Any, score: int): self._monitor.reset_peak() curr, _, _ = self._monitor.get_stats() self.logging_info("Resource Usage After Eviction: %.1f MB", curr) + self._update_model_count_metric() def _spawn_new_model( self, @@ -741,6 +764,7 @@ def _spawn_new_model( self._pending_reservations = max( 0.0, self._pending_reservations - est_cost) self._models[tag].append(instance) + self._update_model_count_metric() return instance except Exception as e: @@ -758,6 +782,7 @@ def _spawn_new_model( raise e def _delete_all_models(self): + self._clear_all_model_metrics() self._idle_lru.clear() for _, instances in self._models.items(): for instance in instances: diff --git a/sdks/python/apache_beam/ml/inference/model_manager_test.py b/sdks/python/apache_beam/ml/inference/model_manager_test.py index 270401857e04..f8f5691fea90 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager_test.py +++ b/sdks/python/apache_beam/ml/inference/model_manager_test.py @@ -26,6 +26,8 @@ from apache_beam.utils import multi_process_shared try: + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName from apache_beam.ml.inference.model_manager import GPUMonitor from apache_beam.ml.inference.model_manager import ModelManager from apache_beam.ml.inference.model_manager import ResourceEstimator @@ -335,6 +337,65 @@ def dummy_loader(): instance = self.manager.acquire_model(model_name, lambda: "model_instance") self.manager.release_model(model_name, instance) + def test_model_manager_metric_gauge(self): + """Test that gauge metrics are updated correctly.""" + tag1 = "model1" + tag2 = "model2" + + def _get_count_gauge_value(tag): + gauge = MetricsEnvironment.process_wide_container().get_gauge( + MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}')) + return gauge.get_cumulative().value + + def _get_est_gauge_value(tag): + gauge = MetricsEnvironment.process_wide_container().get_gauge( + MetricName('BeamML_ModelManager', f'memory_estimate_mb_{tag}')) + return gauge.get_cumulative().value + + # Verify that initial estimates correctly export int metrics + self.manager._estimator.set_initial_estimate(tag1, 1000.5) + self.assertEqual(_get_est_gauge_value(tag1), 1000) + + self.manager._estimator.set_initial_estimate(tag2, 2000.9) + self.assertEqual(_get_est_gauge_value(tag2), 2000) + + # 1. Acquire a model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_count_gauge_value(tag1), 1) + self.assertEqual(_get_est_gauge_value(tag1), 1000) + + # 2. Acquire another instance of same model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_count_gauge_value(tag1), 2) + self.assertEqual(_get_est_gauge_value(tag1), 1000) + + # 3. Acquire a different model + self.manager.acquire_model( + tag2, lambda: MockModel(tag2, 2000.0, self.mock_monitor)) + self.assertEqual(_get_count_gauge_value(tag2), 1) + self.assertEqual(_get_est_gauge_value(tag2), 2000) + # tag1 count should remain 2 + self.assertEqual(_get_count_gauge_value(tag1), 2) + self.assertEqual(_get_est_gauge_value(tag1), 1000) + + # 4. Delete all models + self.manager._delete_all_models() + self.assertEqual(_get_count_gauge_value(tag1), 0) + self.assertEqual(_get_count_gauge_value(tag2), 0) + # Note: Memory estimates are intentionally not cleared on delete + self.assertEqual(_get_est_gauge_value(tag1), 1000) + self.assertEqual(_get_est_gauge_value(tag2), 2000) + + # 5. Repopulate and force reset + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_count_gauge_value(tag1), 1) + + self.manager._force_reset() + self.assertEqual(_get_count_gauge_value(tag1), 0) + def test_single_model_convergence_with_fluctuations(self): """ Tests that the estimator converges to the true usage with fluctuations.