Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,25 @@ def distribution(

@staticmethod
def gauge(
namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingGauge':
namespace: Union[Type, str],
name: str,
process_wide: bool = False) -> 'Metrics.DelegatingGauge':
"""Obtains or creates a Gauge metric.

Gauge metrics are restricted to integer-only values.

Args:
namespace: A class or string that gives the namespace to a metric
name: A string that gives a unique name to a metric
process_wide: Whether or not the metric is specific to the current bundle
or should be calculated for the entire process.

Returns:
A Distribution object.
"""
namespace = Metrics.get_namespace(namespace)
return Metrics.DelegatingGauge(MetricName(namespace, name))
return Metrics.DelegatingGauge(
MetricName(namespace, name), process_wide=process_wide)

@staticmethod
def string_set(
Expand Down Expand Up @@ -216,9 +221,13 @@ def __init__(self, metric_name: MetricName) -> None:

class DelegatingGauge(Gauge):
"""Metrics Gauge that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[method-assign]
self.set = MetricUpdater( # type: ignore[method-assign]
cells.GaugeCell,
metric_name,
process_wide=process_wide)

class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
Expand Down
25 changes: 25 additions & 0 deletions sdks/python/apache_beam/ml/inference/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import torch
from scipy.optimize import nnls

from apache_beam.metrics.metric import Metrics
from apache_beam.utils import multi_process_shared

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -205,6 +206,10 @@ def set_initial_estimate(self, model_tag: str, cost: float):
with self._lock:
self.estimates[model_tag] = cost
self.known_models.add(model_tag)
Metrics.gauge(
"BeamML_ModelManager",
f"memory_estimate_mb_{model_tag}",
process_wide=True).set(int(cost))
self.logging_info("Initial Profile for %s: %s MB", model_tag, cost)

def add_observation(
Expand Down Expand Up @@ -291,6 +296,11 @@ def _solve(self):

self.logging_info(
"Updated Estimate for %s: %.1f MB", model, self.estimates[model])

Metrics.gauge(
"BeamML_ModelManager",
f"memory_estimate_mb_{model}",
process_wide=True).set(int(self.estimates[model]))
self.logging_info("System Bias: %s MB", bias)

except Exception as e:
Expand Down Expand Up @@ -374,6 +384,18 @@ def __init__(

self._monitor.start()

def _update_model_count_metric(self):
for tag, instances in self._models.items():
Metrics.gauge(
"BeamML_ModelManager", f"num_loaded_models_{tag}",
process_wide=True).set(len(instances))

def _clear_all_model_metrics(self):
for tag in self._models:
Metrics.gauge(
"BeamML_ModelManager", f"num_loaded_models_{tag}",
process_wide=True).set(0)

def logging_info(self, message: str, *args):
if self._verbose_logging:
logger.info(message, *args)
Expand Down Expand Up @@ -719,6 +741,7 @@ def _perform_eviction(self, key: str, tag: str, instance: Any, score: int):
self._monitor.reset_peak()
curr, _, _ = self._monitor.get_stats()
self.logging_info("Resource Usage After Eviction: %.1f MB", curr)
self._update_model_count_metric()

def _spawn_new_model(
self,
Expand All @@ -741,6 +764,7 @@ def _spawn_new_model(
self._pending_reservations = max(
0.0, self._pending_reservations - est_cost)
self._models[tag].append(instance)
self._update_model_count_metric()
return instance

except Exception as e:
Expand All @@ -758,6 +782,7 @@ def _spawn_new_model(
raise e

def _delete_all_models(self):
self._clear_all_model_metrics()
self._idle_lru.clear()
for _, instances in self._models.items():
for instance in instances:
Expand Down
61 changes: 61 additions & 0 deletions sdks/python/apache_beam/ml/inference/model_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from apache_beam.utils import multi_process_shared

try:
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName
from apache_beam.ml.inference.model_manager import GPUMonitor
from apache_beam.ml.inference.model_manager import ModelManager
from apache_beam.ml.inference.model_manager import ResourceEstimator
Expand Down Expand Up @@ -335,6 +337,65 @@ def dummy_loader():
instance = self.manager.acquire_model(model_name, lambda: "model_instance")
self.manager.release_model(model_name, instance)

def test_model_manager_metric_gauge(self):
"""Test that gauge metrics are updated correctly."""
tag1 = "model1"
tag2 = "model2"

def _get_count_gauge_value(tag):
gauge = MetricsEnvironment.process_wide_container().get_gauge(
MetricName('BeamML_ModelManager', f'num_loaded_models_{tag}'))
return gauge.get_cumulative().value

def _get_est_gauge_value(tag):
gauge = MetricsEnvironment.process_wide_container().get_gauge(
MetricName('BeamML_ModelManager', f'memory_estimate_mb_{tag}'))
return gauge.get_cumulative().value

# Verify that initial estimates correctly export int metrics
self.manager._estimator.set_initial_estimate(tag1, 1000.5)
self.assertEqual(_get_est_gauge_value(tag1), 1000)

self.manager._estimator.set_initial_estimate(tag2, 2000.9)
self.assertEqual(_get_est_gauge_value(tag2), 2000)

# 1. Acquire a model
self.manager.acquire_model(
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
self.assertEqual(_get_count_gauge_value(tag1), 1)
self.assertEqual(_get_est_gauge_value(tag1), 1000)

# 2. Acquire another instance of same model
self.manager.acquire_model(
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
self.assertEqual(_get_count_gauge_value(tag1), 2)
self.assertEqual(_get_est_gauge_value(tag1), 1000)

# 3. Acquire a different model
self.manager.acquire_model(
tag2, lambda: MockModel(tag2, 2000.0, self.mock_monitor))
self.assertEqual(_get_count_gauge_value(tag2), 1)
self.assertEqual(_get_est_gauge_value(tag2), 2000)
# tag1 count should remain 2
self.assertEqual(_get_count_gauge_value(tag1), 2)
self.assertEqual(_get_est_gauge_value(tag1), 1000)

# 4. Delete all models
self.manager._delete_all_models()
self.assertEqual(_get_count_gauge_value(tag1), 0)
self.assertEqual(_get_count_gauge_value(tag2), 0)
# Note: Memory estimates are intentionally not cleared on delete
self.assertEqual(_get_est_gauge_value(tag1), 1000)
self.assertEqual(_get_est_gauge_value(tag2), 2000)

# 5. Repopulate and force reset
self.manager.acquire_model(
tag1, lambda: MockModel(tag1, 1000.0, self.mock_monitor))
self.assertEqual(_get_count_gauge_value(tag1), 1)

self.manager._force_reset()
self.assertEqual(_get_count_gauge_value(tag1), 0)

def test_single_model_convergence_with_fluctuations(self):
"""
Tests that the estimator converges to the true usage with fluctuations.
Expand Down
Loading