From 725385f29cf56c857bdb52a2855fc9e2b76117bb Mon Sep 17 00:00:00 2001 From: AMOOOMA Date: Mon, 23 Feb 2026 05:47:24 +0000 Subject: [PATCH 1/3] Add num of models reporting --- sdks/python/apache_beam/metrics/metric.py | 17 +++++-- .../apache_beam/ml/inference/model_manager.py | 16 +++++++ .../ml/inference/model_manager_test.py | 44 +++++++++++++++++++ 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 7080dfef009d..7755f843e063 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -109,7 +109,9 @@ def distribution( @staticmethod def gauge( - namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingGauge': + namespace: Union[Type, str], + name: str, + process_wide: bool = False) -> 'Metrics.DelegatingGauge': """Obtains or creates a Gauge metric. Gauge metrics are restricted to integer-only values. @@ -117,12 +119,15 @@ def gauge( Args: namespace: A class or string that gives the namespace to a metric name: A string that gives a unique name to a metric + process_wide: Whether or not the metric is specific to the current bundle + or should be calculated for the entire process. Returns: A Distribution object. """ namespace = Metrics.get_namespace(namespace) - return Metrics.DelegatingGauge(MetricName(namespace, name)) + return Metrics.DelegatingGauge( + MetricName(namespace, name), process_wide=process_wide) @staticmethod def string_set( @@ -216,9 +221,13 @@ def __init__(self, metric_name: MetricName) -> None: class DelegatingGauge(Gauge): """Metrics Gauge that Delegates functionality to MetricsEnvironment.""" - def __init__(self, metric_name: MetricName) -> None: + def __init__( + self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[method-assign] + self.set = MetricUpdater( # type: ignore[method-assign] + cells.GaugeCell, + metric_name, + process_wide=process_wide) class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" diff --git a/sdks/python/apache_beam/ml/inference/model_manager.py b/sdks/python/apache_beam/ml/inference/model_manager.py index 186611984df0..12d9459d6dbe 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager.py +++ b/sdks/python/apache_beam/ml/inference/model_manager.py @@ -45,6 +45,7 @@ import torch from scipy.optimize import nnls +from apache_beam.metrics.metric import Metrics from apache_beam.utils import multi_process_shared logger = logging.getLogger(__name__) @@ -374,6 +375,18 @@ def __init__( self._monitor.start() + def _update_model_count_metric(self): + for tag, instances in self._models.items(): + Metrics.gauge( + "BeamML_ModelManager", f"num_loaded_models_{tag}", + process_wide=True).set(len(instances)) + + def _clear_all_model_metrics(self): + for tag in self._models: + Metrics.gauge( + "BeamML_ModelManager", f"num_loaded_models_{tag}", + process_wide=True).set(0) + def logging_info(self, message: str, *args): if self._verbose_logging: logger.info(message, *args) @@ -719,6 +732,7 @@ def _perform_eviction(self, key: str, tag: str, instance: Any, score: int): self._monitor.reset_peak() curr, _, _ = self._monitor.get_stats() self.logging_info("Resource Usage After Eviction: %.1f MB", curr) + self._update_model_count_metric() def _spawn_new_model( self, @@ -741,6 +755,7 @@ def _spawn_new_model( self._pending_reservations = max( 0.0, self._pending_reservations - est_cost) self._models[tag].append(instance) + self._update_model_count_metric() return instance except Exception as e: @@ -758,6 +773,7 @@ def _spawn_new_model( raise e def _delete_all_models(self): + self._clear_all_model_metrics() self._idle_lru.clear() for _, instances in self._models.items(): for instance in instances: diff --git a/sdks/python/apache_beam/ml/inference/model_manager_test.py b/sdks/python/apache_beam/ml/inference/model_manager_test.py index 270401857e04..6915b39627b4 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager_test.py +++ b/sdks/python/apache_beam/ml/inference/model_manager_test.py @@ -29,6 +29,8 @@ from apache_beam.ml.inference.model_manager import GPUMonitor from apache_beam.ml.inference.model_manager import ModelManager from apache_beam.ml.inference.model_manager import ResourceEstimator + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName except ImportError as e: raise unittest.SkipTest("Model Manager dependencies are not installed") @@ -335,6 +337,48 @@ def dummy_loader(): instance = self.manager.acquire_model(model_name, lambda: "model_instance") self.manager.release_model(model_name, instance) + def test_model_manager_metric_gauge(self): + """Test that the model count gauges are updated correctly with tags.""" + tag1 = "model1" + tag2 = "model2" + self.manager._estimator.set_initial_estimate(tag1, 1000.0) + self.manager._estimator.set_initial_estimate(tag2, 1000.0) + + def _get_gauge_value(tag): + gauge = MetricsEnvironment.process_wide_container().get_gauge( + MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}')) + return gauge.get_cumulative().value + + # 1. Acquire a model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_gauge_value(tag1), 1) + + # 2. Acquire another instance of same model + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_gauge_value(tag1), 2) + + # 3. Acquire a different model + self.manager.acquire_model( + tag2, lambda: MockModel(tag2, 1000.0, self.mock_monitor)) + self.assertEqual(_get_gauge_value(tag2), 1) + # tag1 count should remain 2 + self.assertEqual(_get_gauge_value(tag1), 2) + + # 4. Delete all models + self.manager._delete_all_models() + self.assertEqual(_get_gauge_value(tag1), 0) + self.assertEqual(_get_gauge_value(tag2), 0) + + # 5. Repopulate and force reset + self.manager.acquire_model( + tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) + self.assertEqual(_get_gauge_value(tag1), 1) + + self.manager._force_reset() + self.assertEqual(_get_gauge_value(tag1), 0) + def test_single_model_convergence_with_fluctuations(self): """ Tests that the estimator converges to the true usage with fluctuations. From c8f2fe7f61bf89608616a47696b81fe69eea96a3 Mon Sep 17 00:00:00 2001 From: AMOOOMA Date: Tue, 24 Feb 2026 19:29:56 +0000 Subject: [PATCH 2/3] Export memory estimate as well --- .../apache_beam/ml/inference/model_manager.py | 9 ++++ .../ml/inference/model_manager_test.py | 43 +++++++++++++------ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/model_manager.py b/sdks/python/apache_beam/ml/inference/model_manager.py index 12d9459d6dbe..7d35c3d2c67a 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager.py +++ b/sdks/python/apache_beam/ml/inference/model_manager.py @@ -206,6 +206,10 @@ def set_initial_estimate(self, model_tag: str, cost: float): with self._lock: self.estimates[model_tag] = cost self.known_models.add(model_tag) + Metrics.gauge( + "BeamML_ModelManager", + f"memory_estimate_mb_{model_tag}", + process_wide=True).set(int(cost)) self.logging_info("Initial Profile for %s: %s MB", model_tag, cost) def add_observation( @@ -292,6 +296,11 @@ def _solve(self): self.logging_info( "Updated Estimate for %s: %.1f MB", model, self.estimates[model]) + + Metrics.gauge( + "BeamML_ModelManager", + f"memory_estimate_mb_{model}", + process_wide=True).set(int(self.estimates[model])) self.logging_info("System Bias: %s MB", bias) except Exception as e: diff --git a/sdks/python/apache_beam/ml/inference/model_manager_test.py b/sdks/python/apache_beam/ml/inference/model_manager_test.py index 6915b39627b4..ac8cf642b179 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager_test.py +++ b/sdks/python/apache_beam/ml/inference/model_manager_test.py @@ -338,46 +338,63 @@ def dummy_loader(): self.manager.release_model(model_name, instance) def test_model_manager_metric_gauge(self): - """Test that the model count gauges are updated correctly with tags.""" + """Test that gauge metrics are updated correctly.""" tag1 = "model1" tag2 = "model2" - self.manager._estimator.set_initial_estimate(tag1, 1000.0) - self.manager._estimator.set_initial_estimate(tag2, 1000.0) - def _get_gauge_value(tag): + def _get_count_gauge_value(tag): gauge = MetricsEnvironment.process_wide_container().get_gauge( MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}')) return gauge.get_cumulative().value + def _get_est_gauge_value(tag): + gauge = MetricsEnvironment.process_wide_container().get_gauge( + MetricName('BeamML_ModelManager', f'memory_estimate_mb_{tag}')) + return gauge.get_cumulative().value + + # Verify that initial estimates correctly export int metrics + self.manager._estimator.set_initial_estimate(tag1, 1000.5) + self.assertEqual(_get_est_gauge_value(tag1), 1000) + + self.manager._estimator.set_initial_estimate(tag2, 2000.9) + self.assertEqual(_get_est_gauge_value(tag2), 2000) + # 1. Acquire a model self.manager.acquire_model( tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) - self.assertEqual(_get_gauge_value(tag1), 1) + self.assertEqual(_get_count_gauge_value(tag1), 1) + self.assertEqual(_get_est_gauge_value(tag1), 1000) # 2. Acquire another instance of same model self.manager.acquire_model( tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) - self.assertEqual(_get_gauge_value(tag1), 2) + self.assertEqual(_get_count_gauge_value(tag1), 2) + self.assertEqual(_get_est_gauge_value(tag1), 1000) # 3. Acquire a different model self.manager.acquire_model( - tag2, lambda: MockModel(tag2, 1000.0, self.mock_monitor)) - self.assertEqual(_get_gauge_value(tag2), 1) + tag2, lambda: MockModel(tag2, 2000.0, self.mock_monitor)) + self.assertEqual(_get_count_gauge_value(tag2), 1) + self.assertEqual(_get_est_gauge_value(tag2), 2000) # tag1 count should remain 2 - self.assertEqual(_get_gauge_value(tag1), 2) + self.assertEqual(_get_count_gauge_value(tag1), 2) + self.assertEqual(_get_est_gauge_value(tag1), 1000) # 4. Delete all models self.manager._delete_all_models() - self.assertEqual(_get_gauge_value(tag1), 0) - self.assertEqual(_get_gauge_value(tag2), 0) + self.assertEqual(_get_count_gauge_value(tag1), 0) + self.assertEqual(_get_count_gauge_value(tag2), 0) + # Note: Memory estimates are intentionally not cleared on delete + self.assertEqual(_get_est_gauge_value(tag1), 1000) + self.assertEqual(_get_est_gauge_value(tag2), 2000) # 5. Repopulate and force reset self.manager.acquire_model( tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor)) - self.assertEqual(_get_gauge_value(tag1), 1) + self.assertEqual(_get_count_gauge_value(tag1), 1) self.manager._force_reset() - self.assertEqual(_get_gauge_value(tag1), 0) + self.assertEqual(_get_count_gauge_value(tag1), 0) def test_single_model_convergence_with_fluctuations(self): """ From d3500d6e73b189a3cf586b763fbfdc747690e628 Mon Sep 17 00:00:00 2001 From: AMOOOMA Date: Tue, 24 Feb 2026 22:48:48 +0000 Subject: [PATCH 3/3] Fix lint --- sdks/python/apache_beam/ml/inference/model_manager_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/model_manager_test.py b/sdks/python/apache_beam/ml/inference/model_manager_test.py index ac8cf642b179..f8f5691fea90 100644 --- a/sdks/python/apache_beam/ml/inference/model_manager_test.py +++ b/sdks/python/apache_beam/ml/inference/model_manager_test.py @@ -26,11 +26,11 @@ from apache_beam.utils import multi_process_shared try: + from apache_beam.metrics.execution import MetricsEnvironment + from apache_beam.metrics.metricbase import MetricName from apache_beam.ml.inference.model_manager import GPUMonitor from apache_beam.ml.inference.model_manager import ModelManager from apache_beam.ml.inference.model_manager import ResourceEstimator - from apache_beam.metrics.execution import MetricsEnvironment - from apache_beam.metrics.metricbase import MetricName except ImportError as e: raise unittest.SkipTest("Model Manager dependencies are not installed")