From b838ceeb36e5cdfbc625f2c91806130ddf284d5a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 21 Apr 2025 23:07:01 -0400 Subject: [PATCH 1/6] Add support and unit test for PyOD models Additionally, it includes: - A minor fix for error messages in the `specifiable` module. - Support for scoring offline detectors on a subset of features. --- .../ml/anomaly/detectors/pyod_adapter.py | 102 +++++++++++ .../ml/anomaly/detectors/pyod_adapter_test.py | 160 ++++++++++++++++++ .../apache_beam/ml/anomaly/specifiable.py | 4 +- .../apache_beam/ml/anomaly/transforms.py | 11 ++ 4 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py create mode 100644 sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py new file mode 100644 index 000000000000..e95b42f18065 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pickle +from typing import Any +from typing import Dict +from typing import Iterable +from typing import Optional +from typing import Sequence + +import numpy as np +from pyod.models.base import BaseDetector as PyODBaseDetector + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.anomaly.detectors.offline import OfflineDetector +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.thresholds import FixedThreshold +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import _PostProcessingModelHandler +from apache_beam.ml.inference.utils import _convert_to_result + +# Turn the used ModelHandler into specifiable +KeyedModelHandler = specifiable(KeyedModelHandler) +_PostProcessingModelHandler = specifiable(_PostProcessingModelHandler) + + +@specifiable +class PyODModelHandler(ModelHandler[beam.Row, + PredictionResult, + PyODBaseDetector]): + """Implementation of the ModelHandler interface for PyOD [#]_ Models. + + The ModelHandler processes input data as `beam.Row` objects. + + **NOTE:** This API and its implementation are currently under active + development and may not be backward compatible. + + Args: + model_uri: The URI specifying the location of the pickled PyOD model. + + .. [#] https://github.com/yzhao062/pyod + """ + def __init__(self, model_uri: str): + self._model_uri = model_uri + + def load_model(self) -> PyODBaseDetector: + file = FileSystems.open(self._model_uri, 'rb') + return pickle.load(file) + + def run_inference( + self, + batch: Sequence[beam.Row], + model: PyODBaseDetector, + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + np_batch = [] + for row in batch: + np_batch.append(np.fromiter(row, dtype=np.float64)) + + # stack a batch of samples into a 2-D array for better performance + vectorized_batch = np.stack(np_batch, axis=0) + predictions = model.decision_function(vectorized_batch) + + return _convert_to_result(batch, predictions, model_id=self._model_uri) + + +class PyODFactory(): + @staticmethod + def create_detector(model_uri: str, **kwargs) -> OfflineDetector: + """A utility function to create OfflineDetector for a PyOD model. + + **NOTE:** This API and its implementation are currently under active + development and may not be backward compatible. + + Args: + model_uri: The URI specifying the location of the pickled PyOD model. + **kwargs: Additional keyword arguments. + """ + model_handler = KeyedModelHandler( + PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( + OfflineDetector.score_prediction_adapter) + threshold = float(model_handler.load_model().threshold_) + detector = OfflineDetector( + model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs) + return detector diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py new file mode 100644 index 000000000000..52b9e331e123 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os.path +import pickle +import shutil +import tempfile +import unittest + +import numpy as np +from parameterized import parameterized + +import apache_beam as beam +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly.base import AnomalyResult +from apache_beam.ml.anomaly.transforms import AnomalyDetection +from apache_beam.ml.anomaly.transforms_test import _keyed_result_is_equal_to +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +# Protect against environments where onnx and pytorch library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import pyod + from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory + from pyod.models.iforest import IForest +except ImportError: + raise unittest.SkipTest('PyOD dependencies are not installed') + + +class PyODIForestTest(unittest.TestCase): + def setUp(self) -> None: + self.tmp_dir = tempfile.mkdtemp() + + seed = 1234 + model = IForest(random_state=seed) + model.fit(self.get_train_data()) + self.tmp_fn = os.path.join(self.tmp_dir, 'iforest_pickled') + + with open(self.tmp_fn, 'wb') as fp: + pickle.dump(model, fp) + + def tearDown(self) -> None: + shutil.rmtree(self.tmp_dir) + + def get_train_data(self): + return [ + np.array([1, 5], dtype="float32"), + np.array([2, 6], dtype="float32"), + np.array([3, 4], dtype="float32"), + np.array([2, 6], dtype="float32"), + np.array([10, 10], dtype="float32"), # need an outlier in training data + np.array([3, 4], dtype="float32"), + np.array([2, 6], dtype="float32"), + np.array([2, 6], dtype="float32"), + np.array([2, 5], dtype="float32"), + ] + + def get_test_data(self): + return [ + np.array([2, 6], dtype="float32"), + np.array([100, 100], dtype="float32"), + ] + + def get_test_data_with_target(self): + return [ + np.array([2, 6, 0], dtype="float32"), + np.array([100, 100, 1], dtype="float32"), + ] + + @parameterized.expand([True, False]) + def test_scoring_with_matched_features(self, with_target): + if with_target: + rows = [beam.Row(a=2, b=6, target=0), beam.Row(a=100, b=100, target=1)] + field_names = ["a", "b", "target"] + # The selected features should match the features used for training + detector = PyODFactory.create_detector(self.tmp_fn, features=["a", "b"]) + input_data = self.get_test_data_with_target() + else: + rows = [beam.Row(a=2, b=6), beam.Row(a=100, b=100)] + field_names = ["a", "b"] + detector = PyODFactory.create_detector(self.tmp_fn) + input_data = self.get_test_data() + + expected_out = [( + 0, + AnomalyResult( + example=rows[0], + predictions=[ + AnomalyPrediction( + model_id='OfflineDetector', + score=-0.20316164744828075, + label=0, + threshold=8.326672684688674e-17, + info='', + source_predictions=None) + ])), + ( + 0, + AnomalyResult( + example=rows[1], + predictions=[ + AnomalyPrediction( + model_id='OfflineDetector', + score=0.179516865091218, + label=1, + threshold=8.326672684688674e-17, + info='', + source_predictions=None) + ]))] + + options = PipelineOptions([]) + with beam.Pipeline(options=options) as p: + out = ( + p | beam.Create(input_data) + | beam.Map(lambda x: beam.Row(**dict(zip(field_names, map(int, x))))) + | beam.WithKeys(0) + | AnomalyDetection(detector=detector)) + assert_that(out, equal_to(expected_out, _keyed_result_is_equal_to)) + + def test_scoring_with_unmatched_features(self): + # The model is trained with two features: a, b, but the input features of + # scoring has one more feature (target). + # In this case, we should either get rid of the extra feature(s) from + # the scoring input or set `features` when creating the offline detector + # (see the `test_scoring_with_matched_features`) + detector = PyODFactory.create_detector(self.tmp_fn) + options = PipelineOptions([]) + p = beam.Pipeline(options=options) + _ = ( + p | beam.Create(self.get_test_data_with_target()) + | beam.Map( + lambda x: beam.Row(**dict(zip(["a", "b", "target"], map(int, x))))) + | beam.WithKeys(0) + | AnomalyDetection(detector=detector)) + + # This should raise a ValueError with message + # "X has 3 features, but IsolationForest is expecting 2 features as input." + self.assertRaises(ValueError, p.run) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.WARNING) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py b/sdks/python/apache_beam/ml/anomaly/specifiable.py index 3a2baf434f9b..f13c94966720 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py @@ -116,7 +116,7 @@ def _specifiable_from_spec_helper(v, _run_init): # TODO: support spec treatment for more types if not isinstance(v, BUILTIN_TYPES_IN_SPEC): logging.warning( - "Type %s is not a recognized supported type for the" + "Type %s is not a recognized supported type for the " "specification. It will be included without conversion.", str(type(v))) return v @@ -142,7 +142,7 @@ def _specifiable_to_spec_helper(v): # TODO: support spec treatment for more types if not isinstance(v, BUILTIN_TYPES_IN_SPEC): logging.warning( - "Type %s is not a recognized supported type for the" + "Type %s is not a recognized supported type for the " "specification. It will be included without conversion.", str(type(v))) return v diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py index ae8c78dfc0f1..23f57a130c1f 100644 --- a/sdks/python/apache_beam/ml/anomaly/transforms.py +++ b/sdks/python/apache_beam/ml/anomaly/transforms.py @@ -454,6 +454,13 @@ def _restore_and_convert( ]) return orig_key, (temp_key, result) + def _select_features(self, elem: Tuple[Any, + beam.Row]) -> Tuple[Any, beam.Row]: + assert self._offline_detector._features is not None + k, v = elem + row_dict = v._asdict() + return k, beam.Row(**{k:row_dict[k] for k in self._offline_detector._features}) # pylint: disable=line-too-long + def expand( self, input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]: @@ -468,6 +475,10 @@ def expand( rekeyed_model_input = input | "Rekey" >> beam.Map( lambda x: ((x[0], x[1][0], x[1][1]), x[1][1])) + if self._offline_detector._features is not None: + rekeyed_model_input = rekeyed_model_input | "Select Features" >> beam.Map( + self._select_features) + # ((orig_key, temp_key, beam.Row), AnomalyPrediction) rekeyed_model_output = ( rekeyed_model_input From d5f4ea612992aa67bd1bb033c81f3d7e7af035e7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 22 Apr 2025 14:59:44 -0400 Subject: [PATCH 2/6] Fix lints --- .../apache_beam/ml/anomaly/detectors/pyod_adapter.py | 10 ++++++---- sdks/python/apache_beam/ml/anomaly/transforms.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index e95b42f18065..35af6adccd71 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -37,8 +37,8 @@ from apache_beam.ml.inference.utils import _convert_to_result # Turn the used ModelHandler into specifiable -KeyedModelHandler = specifiable(KeyedModelHandler) -_PostProcessingModelHandler = specifiable(_PostProcessingModelHandler) +KeyedModelHandler = specifiable(KeyedModelHandler) # type: ignore[misc] +_PostProcessingModelHandler = specifiable(_PostProcessingModelHandler) # type: ignore[misc] @specifiable @@ -96,7 +96,9 @@ def create_detector(model_uri: str, **kwargs) -> OfflineDetector: model_handler = KeyedModelHandler( PyODModelHandler(model_uri=model_uri)).with_postprocess_fn( OfflineDetector.score_prediction_adapter) - threshold = float(model_handler.load_model().threshold_) + m = model_handler.load_model() + assert (isinstance(m, PyODBaseDetector)) + threshold = float(m.threshold_) detector = OfflineDetector( - model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs) + model_handler, threshold_criterion=FixedThreshold(threshold), **kwargs) # type: ignore[arg-type] return detector diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py index 23f57a130c1f..f022c2c25b04 100644 --- a/sdks/python/apache_beam/ml/anomaly/transforms.py +++ b/sdks/python/apache_beam/ml/anomaly/transforms.py @@ -459,7 +459,7 @@ def _select_features(self, elem: Tuple[Any, assert self._offline_detector._features is not None k, v = elem row_dict = v._asdict() - return k, beam.Row(**{k:row_dict[k] for k in self._offline_detector._features}) # pylint: disable=line-too-long + return k, beam.Row(**{k: row_dict[k] for k in self._offline_detector._features}) # pylint: disable=line-too-long def expand( self, From d314f0004f6496ace3223e9fad111d5973e99d61 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 22 Apr 2025 15:41:19 -0400 Subject: [PATCH 3/6] More lints. --- .../python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py index 52b9e331e123..968197fa80d5 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py @@ -37,7 +37,6 @@ # Protect against environments where onnx and pytorch library is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: - import pyod from apache_beam.ml.anomaly.detectors.pyod_adapter import PyODFactory from pyod.models.iforest import IForest except ImportError: From e84a284b66c8942c8dd00f6fa06c719e58c441da Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 22 Apr 2025 15:43:52 -0400 Subject: [PATCH 4/6] Add pyod dependencyto ml_test extra --- sdks/python/setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index e1756176093b..97497a24e29e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -489,6 +489,7 @@ def get_portability_package_data(): 'sentence-transformers', 'skl2onnx', 'pillow', + 'pyod', 'tensorflow', 'tensorflow-hub', 'tensorflow-transform', @@ -508,6 +509,7 @@ def get_portability_package_data(): 'sentence-transformers', 'skl2onnx', 'pillow', + 'pyod', 'tensorflow', 'tensorflow-hub', 'tf2onnx', From 3bd93c7dd081a6bc27e6e4720334d02551b8544b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 22 Apr 2025 16:15:52 -0400 Subject: [PATCH 5/6] Revise based on reviews. Fix lints. --- .../apache_beam/ml/anomaly/detectors/pyod_adapter.py | 2 +- .../ml/anomaly/detectors/pyod_adapter_test.py | 11 ++++++----- sdks/python/apache_beam/ml/anomaly/transforms.py | 5 ++++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 35af6adccd71..015c36acdf02 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -23,7 +23,6 @@ from typing import Sequence import numpy as np -from pyod.models.base import BaseDetector as PyODBaseDetector import apache_beam as beam from apache_beam.io.filesystems import FileSystems @@ -35,6 +34,7 @@ from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import _PostProcessingModelHandler from apache_beam.ml.inference.utils import _convert_to_result +from pyod.models.base import BaseDetector as PyODBaseDetector # Turn the used ModelHandler into specifiable KeyedModelHandler = specifiable(KeyedModelHandler) # type: ignore[misc] diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py index 968197fa80d5..bb83e1aeca1c 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter_test.py @@ -50,9 +50,9 @@ def setUp(self) -> None: seed = 1234 model = IForest(random_state=seed) model.fit(self.get_train_data()) - self.tmp_fn = os.path.join(self.tmp_dir, 'iforest_pickled') + self.pickled_model_uri = os.path.join(self.tmp_dir, 'iforest_pickled') - with open(self.tmp_fn, 'wb') as fp: + with open(self.pickled_model_uri, 'wb') as fp: pickle.dump(model, fp) def tearDown(self) -> None: @@ -89,12 +89,13 @@ def test_scoring_with_matched_features(self, with_target): rows = [beam.Row(a=2, b=6, target=0), beam.Row(a=100, b=100, target=1)] field_names = ["a", "b", "target"] # The selected features should match the features used for training - detector = PyODFactory.create_detector(self.tmp_fn, features=["a", "b"]) + detector = PyODFactory.create_detector( + self.pickled_model_uri, features=["a", "b"]) input_data = self.get_test_data_with_target() else: rows = [beam.Row(a=2, b=6), beam.Row(a=100, b=100)] field_names = ["a", "b"] - detector = PyODFactory.create_detector(self.tmp_fn) + detector = PyODFactory.create_detector(self.pickled_model_uri) input_data = self.get_test_data() expected_out = [( @@ -139,7 +140,7 @@ def test_scoring_with_unmatched_features(self): # In this case, we should either get rid of the extra feature(s) from # the scoring input or set `features` when creating the offline detector # (see the `test_scoring_with_matched_features`) - detector = PyODFactory.create_detector(self.tmp_fn) + detector = PyODFactory.create_detector(self.pickled_model_uri) options = PipelineOptions([]) p = beam.Pipeline(options=options) _ = ( diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py index f022c2c25b04..c497421be02d 100644 --- a/sdks/python/apache_beam/ml/anomaly/transforms.py +++ b/sdks/python/apache_beam/ml/anomaly/transforms.py @@ -459,7 +459,10 @@ def _select_features(self, elem: Tuple[Any, assert self._offline_detector._features is not None k, v = elem row_dict = v._asdict() - return k, beam.Row(**{k: row_dict[k] for k in self._offline_detector._features}) # pylint: disable=line-too-long + return ( + k, + beam.Row(**{k: row_dict[k] + for k in self._offline_detector._features})) def expand( self, From 8f1f965091b47cdba06b8001dba78e0ed38bb737 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 22 Apr 2025 19:45:54 -0400 Subject: [PATCH 6/6] Fix failed tests due to the side effect of lazy init on model handlers. --- .../apache_beam/ml/anomaly/detectors/pyod_adapter.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py index 015c36acdf02..f8df78261b90 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/pyod_adapter.py @@ -36,9 +36,15 @@ from apache_beam.ml.inference.utils import _convert_to_result from pyod.models.base import BaseDetector as PyODBaseDetector -# Turn the used ModelHandler into specifiable -KeyedModelHandler = specifiable(KeyedModelHandler) # type: ignore[misc] -_PostProcessingModelHandler = specifiable(_PostProcessingModelHandler) # type: ignore[misc] +# Turn the used ModelHandler into specifiable, but without lazy init. +KeyedModelHandler = specifiable( # type: ignore[misc] + KeyedModelHandler, + on_demand_init=False, + just_in_time_init=False) +_PostProcessingModelHandler = specifiable( # type: ignore[misc] + _PostProcessingModelHandler, + on_demand_init=False, + just_in_time_init=False) @specifiable