diff --git a/README.md b/README.md index 289a4ee..fe2cf02 100644 --- a/README.md +++ b/README.md @@ -36,18 +36,54 @@ To install the `energy-fault-detector` package, run: `pip install energy-fault-d ## Quick fault detection -For a quick demo on a specific dataset, run: +The `quick_fault_detector` CLI now supports dedicated training and prediction workflows: -```quick_fault_detector ``` +- **Train and evaluate a model** (default mode). This trains a new autoencoder, evaluates it on the provided + test slice, and reports where the model artefacts were stored. -For more options, run ```quick_fault_detector -h```. + ```bash + quick_fault_detector --mode train [--options options.yaml] + ``` -For an example using one of the CARE2Compare datasets, run: -```quick_fault_detector --c2c_example``` +- **Run predictions with an existing model**. Supply the dataset to score alongside the directory that contains the + saved model files returned from a previous training run. + + ```bash + quick_fault_detector --mode predict --model_path [--options options.yaml] + ``` + + The `--model_path` argument is mandatory in predict mode. + +Prediction artefacts (anomaly scores, reconstructions, and detected events) are written to the directory specified by +`--results_dir` (defaults to `./results`). For an example using one of the CARE2Compare datasets, run: + +```bash +quick_fault_detector --c2c_example +``` For more information, have a look at the notebook [Quick Failure Detection](./notebooks/Example%20-%20Quick%20Failure%20Detection.ipynb) +## REST prediction API + +The project ships with a lightweight FastAPI application that exposes the prediction workflow via HTTP. The service +resolves models by name and version using the directory structure described in +[`energy_fault_detector/api/service_config.yaml`](energy_fault_detector/api/service_config.yaml). + +Start the API with: + +```bash +uvicorn energy_fault_detector.api.app:app --reload +``` + +By default the service reads its configuration from the bundled `service_config.yaml`. Provide the +`EFD_SERVICE_CONFIG` environment variable to point to a custom YAML file when you want to adapt the model root +directory, override default ignore patterns, or tweak other runtime parameters. Predictions are triggered with a `POST` +request to `/predict` and expect a JSON payload containing at least the `model_name` and `data_path` fields. Optional +fields such as `model_version`, `ignore_features`, and `asset_name` refine which artefacts are used and how the results +are stored. + + ## Fault detection in 5 lines of code ```python diff --git a/asset_dataset_splitter.py b/asset_dataset_splitter.py new file mode 100644 index 0000000..f5faf8c --- /dev/null +++ b/asset_dataset_splitter.py @@ -0,0 +1,224 @@ +"""Utility for splitting asset datasets into train and prediction CSV files. + +The module provides a command line interface that accepts the path to a +directory containing ``.csv`` files separated by semicolons. Each file is +expected to contain records for one or more assets identified by the +``asset_id`` column. + +The script creates two output files per asset: + +``train_.csv`` + Contains all records marked as ``train`` in the ``train_test`` column and + all records whose ``status_type_id`` is either 0 or 2. + +``predict_.csv`` + Contains all records whose ``status_type_id`` is 1, 3, 4 or 5 and all + records whose ``train_test`` value is ``prediction`` regardless of the + ``status_type_id`` value. + +Before saving, the script removes helper columns as well as columns that +contain ``_max``, ``_min`` or ``_std`` from the resulting files. +""" + +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import Dict, Iterable, MutableMapping + +import pandas as pd + +import csv + + +STATUS_TYPE_NORMAL = {"0", "2"} +STATUS_TYPE_ANOMALY = {"1", "3", "4", "5"} +DROP_COLUMNS = { + "asset_id", + "train_test", + "train_test_bool", + "status_type_id", + "status_type_bool", +} +DROP_COLUMN_SUBSTRINGS = ("_max", "_min", "_std") + + +def _detect_delimiter(csv_file: Path) -> str: + """Return the detected delimiter for ``csv_file`` with ``;`` as fallback.""" + + with open(csv_file, "r", encoding="utf-8", errors="ignore") as handle: + sample = handle.read(2048) + sniffer = csv.Sniffer() + try: + dialect = sniffer.sniff(sample) + return dialect.delimiter + except csv.Error: + return ";" + + +def _iter_asset_frames( + csv_file: Path, *, chunksize: int | None = 100_000 +) -> Iterable[tuple[str, pd.DataFrame]]: + """Yield (asset_id, dataframe) pairs from ``csv_file`` without loading everything into memory.""" + + delimiter = _detect_delimiter(csv_file) + reader = pd.read_csv(csv_file, sep=delimiter, dtype=str, chunksize=chunksize) + + # ``pd.read_csv`` returns ``DataFrame`` when ``chunksize`` is ``None``. + if isinstance(reader, pd.DataFrame): + reader = [reader] + + for chunk in reader: + if chunk.empty: + continue + + if "asset_id" not in chunk.columns: + raise ValueError( + f"File '{csv_file}' does not contain required column 'asset_id'." + ) + + for asset_id, group in chunk.groupby("asset_id", sort=False): + yield str(asset_id), group.reset_index(drop=True) + + +def _clean_columns(df: pd.DataFrame) -> pd.DataFrame: + """Remove helper columns and columns containing ``_max``/``_min``/``_std``.""" + + to_drop = [col for col in df.columns if col in DROP_COLUMNS] + to_drop.extend( + col for col in df.columns if any(substr in col for substr in DROP_COLUMN_SUBSTRINGS) + ) + return df.drop(columns=to_drop, errors="ignore") + + +def _get_lowercase_series(df: pd.DataFrame, column: str) -> pd.Series: + series = df.get(column) + if series is None: + return pd.Series("", index=df.index, dtype=object) + return series.fillna("").astype(str).str.lower() + + +def _get_status_series(df: pd.DataFrame) -> pd.Series: + series = df.get("status_type_id") + if series is None: + return pd.Series("", index=df.index, dtype=object) + return series.fillna("").astype(str) + + +def _build_train_frame(df: pd.DataFrame) -> pd.DataFrame: + """Filter rows belonging to the training split.""" + + train_series = _get_lowercase_series(df, "train_test") + status_series = _get_status_series(df) + + is_train = train_series == "train" + is_normal_status = status_series.isin(STATUS_TYPE_NORMAL) + return df[is_train | is_normal_status].copy() + + +def _build_predict_frame(df: pd.DataFrame) -> pd.DataFrame: + """Filter rows belonging to the prediction split.""" + + train_series = _get_lowercase_series(df, "train_test") + status_series = _get_status_series(df) + + is_prediction = train_series == "prediction" + is_anomaly_status = status_series.isin(STATUS_TYPE_ANOMALY) + return df[is_prediction | is_anomaly_status].copy() + + +def _append_asset_frames( + asset_id: str, + df: pd.DataFrame, + output_dir: Path, + header_written: MutableMapping[Path, bool], +) -> None: + """Append data for ``asset_id`` to the corresponding train/predict CSV files.""" + + if df.empty: + return + + train_df = _clean_columns(_build_train_frame(df)) + predict_df = _clean_columns(_build_predict_frame(df)) + + if not train_df.empty: + train_path = output_dir / f"train_{asset_id}.csv" + train_df.to_csv( + train_path, + sep=";", + index=False, + mode="a", + header=not header_written.get(train_path, False), + ) + header_written[train_path] = True + + if not predict_df.empty: + predict_path = output_dir / f"predict_{asset_id}.csv" + predict_df.to_csv( + predict_path, + sep=";", + index=False, + mode="a", + header=not header_written.get(predict_path, False), + ) + header_written[predict_path] = True + + +def split_asset_datasets( + input_dir: Path, + output_dir: Path | None = None, + *, + chunksize: int | None = 100_000, +) -> None: + """Split datasets per asset into train and prediction CSV files. + + Args: + input_dir: Directory containing the source ``.csv`` files. + output_dir: Optional directory to store the results. When ``None`` the + input directory is used. + chunksize: Maximum number of rows per chunk read from the source files. + ``None`` loads the entire file at once. + """ + + if not input_dir.is_dir(): + raise NotADirectoryError(f"Input directory '{input_dir}' does not exist or is not a directory") + + output_dir = output_dir or input_dir + output_dir.mkdir(parents=True, exist_ok=True) + + # Remove previously generated files to avoid appending to stale data. + for existing_file in output_dir.glob("train_*.csv"): + existing_file.unlink() + for existing_file in output_dir.glob("predict_*.csv"): + existing_file.unlink() + + header_written: Dict[Path, bool] = {} + + for csv_file in sorted(input_dir.glob("*.csv")): + for asset_id, asset_df in _iter_asset_frames(csv_file, chunksize=chunksize): + _append_asset_frames(asset_id, asset_df, output_dir, header_written) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Split asset datasets into train and prediction files") + parser.add_argument( + "input_dir", + type=Path, + help="Path to the directory containing the source CSV files", + ) + parser.add_argument( + "--output-dir", + type=Path, + default=None, + help="Optional output directory. Defaults to the input directory.", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + split_asset_datasets(args.input_dir, args.output_dir) + + +if __name__ == "__main__": + main() diff --git a/docs/config_example.yaml b/docs/config_example.yaml index 3dd8c39..8a16532 100644 --- a/docs/config_example.yaml +++ b/docs/config_example.yaml @@ -63,4 +63,11 @@ root_cause_analysis: # (optional) if not specified, no root_cause_analysis (ARC alpha: 0.8 init_x_bias: recon num_iter: 200 - + ignore_features: + # Patterns apply to anomaly scoring and ARCANA. Wildcards such as "windspeed*" are supported. + # list exact column names or use wildcards such as "windspeed*" + - wind_speed_59_avg + - power_58_avg + - wind_speed_60_avg + - wind_speed_61_avg + - power_62_avg \ No newline at end of file diff --git a/docs/usage_examples.rst b/docs/usage_examples.rst index ed70e09..f32488e 100644 --- a/docs/usage_examples.rst +++ b/docs/usage_examples.rst @@ -98,6 +98,42 @@ algorithm. An example: .. include:: config_example.yaml :literal: +To keep specific sensors out of both anomaly scoring *and* ARCANA you can edit the configuration YAML that you pass to +:py:obj:`Config ` (for example ``configs/base_config.yaml`` or a copy of it). Inside +the ``root_cause_analysis`` section add or update the optional ``ignore_features`` list. Any column names listed here are +zeroed before the anomaly score is calculated and remain fixed during ARCANA optimisation, so they never trigger +anomalies and are never suggested as a root cause. + +Column names must match the data frame headers used during training or prediction. You can either provide the exact +column name or use Unix shell-style wildcards to target multiple columns at once (for example ``windspeed*`` matches +``windspeed_avg`` and ``windspeed_peak``). If a configured pattern does not match any columns a warning is logged so you +can correct the entry. + +.. code-block:: yaml + + root_cause_analysis: + ignore_features: + - windspeed* + - output_power + +When the configuration above is used, wind speed signals such as ``windspeed_avg`` are kept for model training but their +reconstruction error is ignored for anomaly scores and in ARCANA runs. The same YAML file can be supplied when calling +:py:meth:`FaultDetector.fit ` for training or +:py:meth:`FaultDetector.predict ` for inference so the same +set of exclusions is applied in both stages. +To prevent ARCANA from adjusting certain sensors you can edit the configuration YAML that you pass to +:py:obj:`Config ` (for example ``configs/base_config.yaml`` or a copy of it). Inside +the ``root_cause_analysis`` section add or update the optional ``ignore_features`` list. Any column names listed here +remain fixed during optimisation and are therefore never suggested as a root cause. + +Column names must match the data frame headers used at prediction time. You can either provide the exact column name or +use Unix shell-style wildcards to target multiple columns at once (for example ``windspeed*`` matches +``windspeed_avg`` and ``windspeed_peak``). If a configured pattern does not match any columns a warning is logged so you +can correct the entry. +The optional ``ignore_features`` list inside ``root_cause_analysis`` can be used to prevent ARCANA from adjusting +specific sensors (for example ``windspeed`` or ``output_power``). These features will be kept fixed during the +optimisation and therefore won't be reported as a root cause. + To update the configuration 'on the fly' (for example for hyperparameter optimization), you provide a new configuration dictionary via the ``update_config`` method: diff --git a/energy_fault_detector/anomaly_scores/rmse_score.py b/energy_fault_detector/anomaly_scores/rmse_score.py index 5bfc0a4..f5d3e23 100644 --- a/energy_fault_detector/anomaly_scores/rmse_score.py +++ b/energy_fault_detector/anomaly_scores/rmse_score.py @@ -49,8 +49,22 @@ def fit(self, x: DataType, y: Optional[pd.Series] = None) -> 'RMSEScore': if self.scale: # fitted attributes need trailing underscore - and are not initialized - self.std_x_: np.array = np.std(x, axis=0) - self.mean_x_: np.array = np.mean(x, axis=0) + data = self._to_numpy(x) + + if data.ndim == 1: + data = data.reshape(-1, 1) + + n_samples = data.shape[0] + if n_samples == 0: + raise ValueError("Cannot fit RMSEScore on empty data.") + + # Calculate mean and standard deviation without allocating an + # additional array the size of the dataset. + self.mean_x_: np.ndarray = np.sum(data, axis=0) / n_samples + sum_squared = np.einsum('ij,ij->j', data, data, optimize=True) + variance = sum_squared / n_samples - np.square(self.mean_x_) + variance = np.maximum(variance, 0.0) + self.std_x_: np.ndarray = np.sqrt(variance) self.fitted_ = True # nothing to fit return self @@ -70,17 +84,35 @@ def transform(self, x: DataType) -> pd.Series: check_is_fitted(self) + original_index = x.index if isinstance(x, pd.DataFrame) else None + data = self._to_numpy(x) + + if data.ndim == 1: + data = data.reshape(-1, 1) + if self.scale: # standardization of the reconstruction error in X if np.all(self.std_x_ > 0): - x = (x - self.mean_x_) / self.std_x_ + data = (data - self.mean_x_) / self.std_x_ else: - x = x - self.mean_x_ - # replace possible inf values with 0 - x[np.isinf(x)] = 0 + data = data - self.mean_x_ + # replace possible inf values with 0 + data[np.isinf(data)] = 0 + + if data.shape[1] == 0: + raise ValueError("Input data must contain at least one feature.") - scores = np.sqrt(np.mean(x ** 2, axis=1)) - if isinstance(x, (pd.DataFrame, pd.Series)): - scores = pd.Series(scores, index=x.index) + mean_squared = np.einsum('ij,ij->i', data, data, optimize=True) / data.shape[1] + scores = np.sqrt(mean_squared) + if original_index is not None: + scores = pd.Series(scores, index=original_index) return scores + + @staticmethod + def _to_numpy(x: DataType) -> np.ndarray: + if isinstance(x, pd.DataFrame): + return x.to_numpy(dtype=np.float64, copy=False) + if isinstance(x, pd.Series): + return x.to_numpy(dtype=np.float64, copy=False) + return np.asarray(x, dtype=np.float64) diff --git a/energy_fault_detector/api/__init__.py b/energy_fault_detector/api/__init__.py new file mode 100644 index 0000000..33a22b6 --- /dev/null +++ b/energy_fault_detector/api/__init__.py @@ -0,0 +1,26 @@ + +"""API utilities for the Energy Fault Detector package.""" + +from .prediction_api import app + +__all__ = ["app"] +"""REST API utilities for the Energy Fault Detector package.""" + +from __future__ import annotations + +from importlib import import_module +from typing import Any, List + +__all__ = ["app"] + + +def __getattr__(name: str) -> Any: + if name == "app": + module = import_module("energy_fault_detector.api.app") + return module.app + raise AttributeError(name) + + +def __dir__() -> List[str]: # pragma: no cover - small helper + return sorted(__all__) + diff --git a/energy_fault_detector/api/app.py b/energy_fault_detector/api/app.py new file mode 100644 index 0000000..e308c24 --- /dev/null +++ b/energy_fault_detector/api/app.py @@ -0,0 +1,210 @@ +"""FastAPI application exposing prediction endpoints.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +import pandas as pd +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field, validator + +from energy_fault_detector.quick_fault_detection import quick_fault_detector + +from .model_registry import ModelNotFoundError, ModelRegistry +from .settings import get_settings + + +logger = logging.getLogger(__name__) + + +def _merge_ignore_patterns(defaults: List[str], overrides: List[str]) -> List[str]: + merged: List[str] = [] + for values in (defaults or [], overrides or []): + for value in values: + if value not in merged: + merged.append(value) + return merged + + +def _dataframe_to_records(df: Optional[pd.DataFrame]) -> List[Dict[str, Any]]: + if df is None or df.empty: + return [] + + serialisable = df.reset_index() + serialisable = serialisable.where(pd.notnull(serialisable), None) + return json.loads(serialisable.to_json(orient="records", date_format="iso")) + + +def _determine_asset_name(model_name: str, model_version: str, explicit: Optional[str], template: str) -> str: + if explicit: + return explicit + + try: + return template.format(model_name=model_name, model_version=model_version) + except KeyError as exc: # pragma: no cover - configuration error + raise HTTPException( + status_code=500, + detail=f"Invalid asset name template; missing placeholder: {exc}", + ) from exc + + +def _derive_artifact_directory(save_root: Optional[Path], data_path: Path, asset_name: str) -> Path: + base_dir = save_root if save_root is not None else data_path.resolve().parent + return base_dir / "prediction_output" / asset_name + + +class PredictionRequest(BaseModel): + """Request body for the prediction endpoint.""" + + model_name: str = Field(..., description="Logical name of the model to load.") + data_path: str = Field(..., description="Path to the CSV file that should be analysed.") + model_version: Optional[str] = Field( + None, description="Specific model version to use. Defaults to the latest available version." + ) + ignore_features: List[str] = Field( + default_factory=list, + description="Feature names or wildcard patterns to exclude from root cause analysis.", + ) + asset_name: Optional[str] = Field( + None, description="Optional asset identifier to use when persisting prediction artefacts." + ) + debug_plots: Optional[bool] = Field( + None, description="Override the configuration setting that controls debug plot generation." + ) + + @validator("ignore_features", pre=True) + def _coerce_ignore_features(cls, value: Any) -> List[str]: # pragma: no cover - validation logic + if value is None: + return [] + if isinstance(value, str): + return [value] + return list(value) + + +class PredictionSummary(BaseModel): + total_samples: int + anomaly_samples: int + critical_samples: int + event_count: int + + +class PredictionResponse(BaseModel): + model_name: str + model_version: str + data_path: str + asset_name: str + ignore_features: List[str] + artifact_directory: Optional[str] + summary: PredictionSummary + events: List[Dict[str, Any]] + + +class HealthResponse(BaseModel): + status: str = "ok" + + +settings = get_settings() +model_registry = ModelRegistry( + root_directory=settings.model_store.root_directory, + default_version_strategy=settings.model_store.default_version_strategy, +) + +app = FastAPI(title="Energy Fault Detector", version="1.0.0") + + +@app.get("/health", response_model=HealthResponse) +def health_check() -> HealthResponse: + """Simple endpoint to verify that the service is running.""" + + return HealthResponse() + + +@app.post("/predict", response_model=PredictionResponse) +def predict(request: PredictionRequest) -> PredictionResponse: + """Execute the quick fault detector in prediction mode.""" + + try: + model_path, resolved_version = model_registry.resolve( + model_name=request.model_name, model_version=request.model_version + ) + except ModelNotFoundError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + + data_path = Path(request.data_path).expanduser().resolve() + if not data_path.exists(): + raise HTTPException(status_code=400, detail=f"Data file '{data_path}' does not exist.") + + ignore_patterns = _merge_ignore_patterns( + settings.prediction.default_ignore_features, request.ignore_features + ) + debug_plots = request.debug_plots if request.debug_plots is not None else settings.prediction.debug_plots + + asset_name = _determine_asset_name( + model_name=request.model_name, + model_version=resolved_version, + explicit=request.asset_name, + template=settings.prediction.default_asset_name, + ) + + save_root = settings.prediction.output_directory + if save_root is not None: + save_root.mkdir(parents=True, exist_ok=True) + + try: + prediction_results, event_metadata, _ = quick_fault_detector( + csv_data_path=None, + csv_test_data_path=str(data_path), + train_test_column_name=settings.prediction.train_test_column, + train_test_mapping=settings.prediction.train_test_mapping or None, + time_column_name=settings.prediction.time_column, + status_data_column_name=settings.prediction.status_column, + status_mapping=settings.prediction.status_mapping or None, + min_anomaly_length=settings.prediction.min_anomaly_length, + enable_debug_plots=debug_plots, + save_dir=str(save_root) if save_root is not None else None, + mode="predict", + model_path=str(model_path), + asset_name=asset_name, + rca_ignore_features=ignore_patterns or None, + ) + except FileNotFoundError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + except Exception as exc: # pragma: no cover - defensive programming + logger.exception("Prediction request failed: %s", exc) + raise HTTPException(status_code=500, detail="Prediction failed. See logs for details.") from exc + + predicted_anomalies = prediction_results.predicted_anomalies.copy() + total_samples = int(len(predicted_anomalies)) + anomaly_samples = int(predicted_anomalies.get("anomaly", pd.Series(dtype=bool)).fillna(False).sum()) + critical_samples = int( + predicted_anomalies.get("critical_event", pd.Series(dtype=bool)).fillna(False).sum() + ) + event_count = int(len(event_metadata)) if event_metadata is not None else 0 + + artifact_directory = _derive_artifact_directory(save_root, data_path, asset_name) + + response = PredictionResponse( + model_name=request.model_name, + model_version=resolved_version, + data_path=str(data_path), + asset_name=asset_name, + ignore_features=ignore_patterns, + artifact_directory=str(artifact_directory) if artifact_directory is not None else None, + summary=PredictionSummary( + total_samples=total_samples, + anomaly_samples=anomaly_samples, + critical_samples=critical_samples, + event_count=event_count, + ), + events=_dataframe_to_records(event_metadata), + ) + + return response + + +__all__ = ["app", "PredictionRequest", "PredictionResponse", "PredictionSummary"] diff --git a/energy_fault_detector/api/artifacts/.gitkeep b/energy_fault_detector/api/artifacts/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/energy_fault_detector/api/model_registry.py b/energy_fault_detector/api/model_registry.py new file mode 100644 index 0000000..a543593 --- /dev/null +++ b/energy_fault_detector/api/model_registry.py @@ -0,0 +1,91 @@ +"""Utilities for locating trained model artefacts.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + + +class ModelNotFoundError(FileNotFoundError): + """Raised when the requested model or version does not exist.""" + + +def _looks_like_model_directory(path: Path) -> bool: + """Heuristic to determine whether ``path`` contains model artefacts.""" + + if not path.is_dir(): + return False + + if (path / "config.yaml").exists(): + return True + + expected_components = {"data_preprocessor", "autoencoder", "threshold_selector", "anomaly_score"} + child_directories = {child.name for child in path.iterdir() if child.is_dir()} + return bool(expected_components & child_directories) + + +def _sort_versions(versions: Iterable[Path]) -> List[Path]: + return sorted(versions, key=lambda version: version.name) + + +@dataclass +class ModelRegistry: + """Simple model registry resolving model names to artefact locations.""" + + root_directory: Path + default_version_strategy: str = "latest" + + def __post_init__(self) -> None: + self.root_directory = Path(self.root_directory).resolve() + + def resolve(self, model_name: str, model_version: Optional[str] = None) -> Tuple[Path, str]: + """Return the filesystem path for ``model_name`` and ``model_version``.""" + + model_root = self.root_directory / model_name + if not model_root.exists(): + raise ModelNotFoundError(f"Model '{model_name}' was not found under '{self.root_directory}'.") + + if model_version: + candidate = model_root / model_version + if not _looks_like_model_directory(candidate): + raise ModelNotFoundError( + f"Model '{model_name}' does not provide a version named '{model_version}'." + ) + return candidate, model_version + + versions = [child for child in model_root.iterdir() if _looks_like_model_directory(child)] + + if versions: + if self.default_version_strategy != "latest": + raise ValueError( + f"Unsupported version selection strategy '{self.default_version_strategy}'." + ) + selected = _sort_versions(versions)[-1] + return selected, selected.name + + if _looks_like_model_directory(model_root): + return model_root, model_root.name + + raise ModelNotFoundError( + f"No model versions found for '{model_name}' in '{model_root}'. Ensure the expected directory structure exists." + ) + + def list_versions(self, model_name: str) -> List[str]: + """Return all discovered versions for ``model_name``.""" + + model_root = self.root_directory / model_name + if not model_root.exists(): + raise ModelNotFoundError(f"Model '{model_name}' was not found under '{self.root_directory}'.") + + versions = [child for child in model_root.iterdir() if _looks_like_model_directory(child)] + if versions: + return [path.name for path in _sort_versions(versions)] + + if _looks_like_model_directory(model_root): + return [model_root.name] + + return [] + + +__all__ = ["ModelRegistry", "ModelNotFoundError"] diff --git a/energy_fault_detector/api/models/.gitkeep b/energy_fault_detector/api/models/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/energy_fault_detector/api/prediction_api.py b/energy_fault_detector/api/prediction_api.py new file mode 100644 index 0000000..5ec7cce --- /dev/null +++ b/energy_fault_detector/api/prediction_api.py @@ -0,0 +1,462 @@ +"""FastAPI application exposing the quick fault detector prediction endpoint.""" + +from __future__ import annotations + +import logging +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple + +import pandas as pd +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from energy_fault_detector._logs import setup_logging +from energy_fault_detector.config import Config +from energy_fault_detector.core.fault_detection_result import FaultDetectionResult +from energy_fault_detector.fault_detector import FaultDetector +from energy_fault_detector.quick_fault_detection.data_loading import get_sensor_data +from energy_fault_detector.quick_fault_detection.quick_fault_detector import analyze_event +from energy_fault_detector.utils.analysis import create_events + +DEFAULT_TIMESTAMP_COLUMN = "time_stamp" +DEFAULT_MIN_EVENT_LENGTH = 18 + +LOGGING_CONFIG_PATH = Path(__file__).resolve().parent.parent / "logging.yaml" +setup_logging(str(LOGGING_CONFIG_PATH)) + +logger = logging.getLogger("energy_fault_detector.prediction_api") + + +class PredictionAPIError(Exception): + """Base class for handled prediction API exceptions.""" + + status_code: int = 500 + error_code: str = "EFD_INTERNAL_ERROR" + + def __init__(self, message: str) -> None: + super().__init__(message) + self.message = message + + +class InvalidModelError(PredictionAPIError): + """Raised when a referenced model cannot be loaded.""" + + status_code = 404 + error_code = "EFD_MODEL_NOT_FOUND" + + +class SchemaMismatchError(PredictionAPIError): + """Raised when the input schema does not match the model expectations.""" + + status_code = 409 + error_code = "EFD_SCHEMA_MISMATCH" + + +class DataTypeMismatchError(PredictionAPIError): + """Raised when data types of provided columns do not match the trained model.""" + + status_code = 422 + error_code = "EFD_DATATYPE_MISMATCH" + + +class TimestampValidationError(PredictionAPIError): + """Raised when the timestamp column is missing or cannot be parsed.""" + + status_code = 422 + error_code = "EFD_TIMESTAMP_INVALID" + + +class EmptyInputError(PredictionAPIError): + """Raised when the provided payload does not contain any rows.""" + + status_code = 400 + error_code = "EFD_EMPTY_PAYLOAD" + + +class EventMetadata(BaseModel): + """Metadata describing an anomaly event.""" + + event_id: int = Field(..., description="Identifier of the anomaly event.") + start: datetime = Field(..., description="Start timestamp of the event.") + end: datetime = Field(..., description="End timestamp of the event.") + duration_seconds: float = Field(..., ge=0, description="Duration of the event in seconds.") + + +class SensorPoint(BaseModel): + """Time-series point describing sensor values and anomaly scores.""" + + timestamp: datetime = Field(..., description="Timestamp of the sensor snapshot.") + anomaly_score: Optional[float] = Field(None, description="Computed anomaly score.") + threshold_score: Optional[float] = Field(None, description="Threshold used for anomaly decision.") + behaviour: Optional[str] = Field(None, description="Model assessment of the behaviour at this timestamp.") + cumulative_anomaly_score: Optional[int] = Field(None, description="Cumulative anomaly counter.") + sensors: Dict[str, Optional[float]] = Field(..., description="Sensor readings contributing to the event.") + + +class EventSensorData(BaseModel): + """Detailed sensor time series for an event, including ARCANA summaries.""" + + event_id: int + points: List[SensorPoint] + arcana_mean_importances: Dict[str, float] = Field(default_factory=dict) + arcana_losses: Optional[List[Dict[str, Any]]] = None + + +class PredictionSuccessResponse(BaseModel): + """Successful prediction payload.""" + + status: str = Field("success", const=True) + events: List[EventMetadata] + event_sensor_data: List[EventSensorData] + + +class PredictionRequest(BaseModel): + """Payload describing a prediction request.""" + + model_path: str = Field(..., description="Path to the saved fault detector model.") + data: List[Dict[str, Any]] = Field(..., min_items=1, description="Prediction data as list of records.") + timestamp_column: str = Field( + DEFAULT_TIMESTAMP_COLUMN, + description="Name of the timestamp column in the provided data.", + ) + min_event_length: int = Field( + DEFAULT_MIN_EVENT_LENGTH, + ge=1, + description="Minimum number of consecutive anomalies required to form an event.", + ) + + +def _load_fault_detector(model_path: str) -> FaultDetector: + """Instantiate and load a :class:`FaultDetector` from disk.""" + + try: + fallback_config_path = Path(__file__).resolve().parent.parent / "base_config.yaml" + fallback_config = Config(str(fallback_config_path)) + detector = FaultDetector(config=fallback_config) + detector.load_models(model_path=model_path) + except FileNotFoundError as exc: # pragma: no cover - dependent on filesystem + raise InvalidModelError( + f"The provided model path '{model_path}' does not exist." + ) from exc + except (OSError, ValueError) as exc: # pragma: no cover - dependent on filesystem + raise InvalidModelError( + f"Failed to load model artefacts from '{model_path}'." + ) from exc + return detector + + +def _ensure_datetime(value: Any) -> datetime: + """Convert various timestamp representations to :class:`datetime`.""" + + if isinstance(value, pd.Timestamp): + return value.to_pydatetime() + if isinstance(value, datetime): + return value + return pd.to_datetime(value).to_pydatetime() + + +def _to_optional_float(value: Any) -> Optional[float]: + """Convert numeric values to floats while preserving missing values.""" + + if value is None: + return None + if isinstance(value, (float, int)): + return float(value) + if pd.isna(value): + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _extract_expected_columns(data_preprocessor: Any) -> List[str]: + """Retrieve the expected input columns from the saved data preprocessor.""" + + if data_preprocessor is None: + return [] + + named_steps = getattr(data_preprocessor, "named_steps", {}) + column_selector = named_steps.get("column_selector") + if column_selector is not None and hasattr(column_selector, "feature_names_out_"): + return list(column_selector.feature_names_out_) + + imputer = named_steps.get("imputer") + if imputer is not None and hasattr(imputer, "feature_names_in_"): + return list(imputer.feature_names_in_) + + if hasattr(data_preprocessor, "get_feature_names_out"): + try: + return list(data_preprocessor.get_feature_names_out()) + except ValueError: # pragma: no cover - depends on sklearn internals + pass + + if hasattr(data_preprocessor, "feature_names_in_"): + return list(data_preprocessor.feature_names_in_) + + return [] + + +def _validate_input_schema( + sensor_data: pd.DataFrame, + expected_columns: Sequence[str], + original_columns: Iterable[str], +) -> None: + """Validate that provided sensor data matches model expectations.""" + + original_column_set = set(original_columns) + + missing_columns: List[str] = [] + dtype_issues: List[str] = [] + for column in expected_columns: + if column not in sensor_data.columns: + if column in original_column_set: + dtype_issues.append(column) + else: + missing_columns.append(column) + continue + + converted = pd.to_numeric(sensor_data[column], errors="coerce") + non_convertible = sensor_data[column].notna() & converted.isna() + if non_convertible.any(): + dtype_issues.append(column) + + if missing_columns: + formatted = ", ".join(sorted(missing_columns)) + raise SchemaMismatchError( + f"Input data is missing required features: {formatted}." + ) + + if dtype_issues: + formatted = ", ".join(sorted(set(dtype_issues))) + raise DataTypeMismatchError( + f"The following features contain non-numeric values: {formatted}." + ) + + +def _build_event_sensor_payload( + event_id: int, + event_data: pd.DataFrame, + predicted_anomalies: pd.DataFrame, + arcana_mean_importances: Optional[pd.Series], + arcana_losses: Optional[pd.DataFrame], +) -> EventSensorData: + """Create the response payload for a single anomaly event.""" + + aligned_anomalies = predicted_anomalies.reindex(event_data.index) + + points: List[SensorPoint] = [] + for (timestamp, sensor_row), (_, anomaly_row) in zip(event_data.iterrows(), aligned_anomalies.iterrows()): + sensors = {column: _to_optional_float(value) for column, value in sensor_row.items()} + anomaly_score = anomaly_row.get("anamoly_score") + threshold_value = anomaly_row.get("threshold_score") + cumulative_score = anomaly_row.get("cumulative_anamoly_score") + + point = SensorPoint( + timestamp=_ensure_datetime(timestamp), + anomaly_score=_to_optional_float(anomaly_score), + threshold_score=_to_optional_float(threshold_value), + behaviour=str(anomaly_row.get("behaviour")) if anomaly_row.get("behaviour") is not None else None, + cumulative_anomaly_score=int(cumulative_score) if pd.notna(cumulative_score) else None, + sensors=sensors, + ) + points.append(point) + + mean_importances = ( + {feature: float(value) for feature, value in arcana_mean_importances.items()} + if arcana_mean_importances is not None + else {} + ) + + if arcana_losses is not None: + losses_payload = ( + arcana_losses.reset_index() + .rename(columns={arcana_losses.index.name or "index": "iteration"}) + .to_dict(orient="records") + ) + else: + losses_payload = None + + return EventSensorData( + event_id=event_id, + points=points, + arcana_mean_importances=mean_importances, + arcana_losses=losses_payload, + ) + + +def run_prediction( + request: PredictionRequest, + detector_loader: Optional[Callable[[str], FaultDetector]] = None, + events_factory: Optional[ + Callable[[pd.DataFrame, pd.Series, int], Tuple[pd.DataFrame, List[pd.DataFrame]]] + ] = None, + event_analyzer: Optional[ + Callable[[FaultDetector, pd.DataFrame, bool], Tuple[pd.Series, pd.DataFrame]] + ] = None, +) -> PredictionSuccessResponse: + """Execute a prediction run and build the response payload.""" + + if not request.data: + raise EmptyInputError("No prediction data provided.") + + timestamp_column = request.timestamp_column or DEFAULT_TIMESTAMP_COLUMN + + data_frame = pd.DataFrame(request.data) + if data_frame.empty: + raise EmptyInputError("No prediction data provided.") + + if timestamp_column not in data_frame.columns: + raise TimestampValidationError( + f"Timestamp column '{timestamp_column}' is missing from the input data." + ) + + try: + timestamp_index = pd.to_datetime(data_frame[timestamp_column], errors="raise") + except (ValueError, TypeError) as exc: + raise TimestampValidationError( + f"Failed to parse timestamp column '{timestamp_column}': {exc}" + ) from exc + + feature_data = data_frame.drop(columns=[timestamp_column]) + feature_data.index = timestamp_index + original_columns = list(feature_data.columns) + + sensor_data = get_sensor_data(feature_data.copy()) + if sensor_data.empty: + raise DataTypeMismatchError( + "None of the provided features could be converted to numeric sensor values." + ) + + loader = detector_loader or _load_fault_detector + detector = loader(request.model_path) + + expected_columns = _extract_expected_columns(detector.data_preprocessor) + if expected_columns: + _validate_input_schema(sensor_data, expected_columns, original_columns) + sensor_data = sensor_data[expected_columns] + else: + expected_columns = list(sensor_data.columns) + + sensor_data = sensor_data.sort_index() + + prediction_results: FaultDetectionResult = detector.predict( + sensor_data=sensor_data, root_cause_analysis=True + ) + predicted_anomalies = prediction_results.predicted_anomalies.copy() + anomalies = predicted_anomalies["anomaly"] + + event_creator = events_factory or create_events + event_meta_data, event_data_list = event_creator( + sensor_data=sensor_data, + boolean_information=anomalies, + min_event_length=request.min_event_length, + ) + + event_ids = list(range(1, len(event_data_list) + 1)) + + predicted_anomalies["event_id"] = pd.Series( + pd.NA, index=predicted_anomalies.index, dtype="Int64" + ) + predicted_anomalies["critical_event"] = False + for event_id, event_data in zip(event_ids, event_data_list): + event_index = event_data.index + predicted_anomalies.loc[event_index, "event_id"] = event_id + predicted_anomalies.loc[event_index, "critical_event"] = True + + analyzer = event_analyzer or analyze_event + event_sensor_payload: List[EventSensorData] = [] + for event_id, event_data in zip(event_ids, event_data_list): + arcana_mean_importances, arcana_losses = analyzer( + anomaly_detector=detector, + event_data=event_data, + track_losses=False, + ) + event_sensor_payload.append( + _build_event_sensor_payload( + event_id=event_id, + event_data=event_data, + predicted_anomalies=predicted_anomalies, + arcana_mean_importances=arcana_mean_importances, + arcana_losses=arcana_losses if arcana_losses is not None and not arcana_losses.empty else None, + ) + ) + + events_payload: List[EventMetadata] = [] + for event_id, (_, meta_row) in zip(event_ids, event_meta_data.iterrows()): + duration = meta_row["duration"] + if isinstance(duration, pd.Timedelta): + duration_seconds = float(duration.total_seconds()) + else: + duration_seconds = float(duration) + events_payload.append( + EventMetadata( + event_id=event_id, + start=_ensure_datetime(meta_row["start"]), + end=_ensure_datetime(meta_row["end"]), + duration_seconds=duration_seconds, + ) + ) + + logger.info( + "Prediction finished for model '%s'. Detected %d anomaly events.", + request.model_path, + len(events_payload), + ) + + return PredictionSuccessResponse( + status="success", + events=events_payload, + event_sensor_data=event_sensor_payload, + ) + + +app = FastAPI(title="Energy Fault Detector Prediction API") + + +@app.post( + "/predict", + response_model=PredictionSuccessResponse, + responses={ + 404: {"description": "Model not found."}, + 409: {"description": "Schema mismatch."}, + 422: {"description": "Data validation error."}, + 500: {"description": "Internal server error."}, + }, +) +async def predict(request: PredictionRequest) -> PredictionSuccessResponse: + """Predict anomalies for the provided payload.""" + + try: + return run_prediction(request) + except PredictionAPIError as exc: + logger.exception("Prediction request failed: %s", exc.message) + raise HTTPException( + status_code=exc.status_code, + detail={"status": "error", "code": exc.error_code, "message": exc.message}, + ) from exc + except Exception as exc: # pragma: no cover - safeguard for unexpected failures + logger.exception("Unexpected error during prediction request handling.") + raise HTTPException( + status_code=500, + detail={ + "status": "error", + "code": "EFD_INTERNAL_ERROR", + "message": "Unexpected internal server error.", + }, + ) from exc + + +__all__ = [ + "app", + "run_prediction", + "PredictionRequest", + "PredictionSuccessResponse", + "PredictionAPIError", + "SchemaMismatchError", + "DataTypeMismatchError", + "InvalidModelError", + "TimestampValidationError", + "EmptyInputError", +] diff --git a/energy_fault_detector/api/service_config.yaml b/energy_fault_detector/api/service_config.yaml new file mode 100644 index 0000000..1b8dcc7 --- /dev/null +++ b/energy_fault_detector/api/service_config.yaml @@ -0,0 +1,16 @@ +model_store: + root_directory: "./models" + default_version_strategy: "latest" + +prediction: + config_path: "../base_config.yaml" + time_column: "time_stamp" + status_column: null + status_mapping: {} + train_test_column: null + train_test_mapping: {} + min_anomaly_length: 18 + debug_plots: false + output_directory: "./artifacts" + default_asset_name: "{model_name}" + default_ignore_features: [] diff --git a/energy_fault_detector/api/settings.py b/energy_fault_detector/api/settings.py new file mode 100644 index 0000000..02b92ea --- /dev/null +++ b/energy_fault_detector/api/settings.py @@ -0,0 +1,138 @@ +"""Settings loader for the REST API.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from functools import lru_cache +from pathlib import Path +from typing import Dict, List, Optional + +import yaml + + +DEFAULT_CONFIG_PATH = Path(__file__).with_name("service_config.yaml") +CONFIG_ENV_VAR = "EFD_SERVICE_CONFIG" + + +@dataclass +class ModelStoreSettings: + """Settings describing where trained models are stored.""" + + root_directory: Path + default_version_strategy: str = "latest" + + +@dataclass +class PredictionSettings: + """Settings that control how predictions are executed.""" + + config_path: Path + time_column: Optional[str] = None + status_column: Optional[str] = None + status_mapping: Dict[str, bool] = field(default_factory=dict) + train_test_column: Optional[str] = None + train_test_mapping: Dict[str, bool] = field(default_factory=dict) + min_anomaly_length: int = 18 + debug_plots: bool = False + output_directory: Optional[Path] = None + default_asset_name: str = "{model_name}" + default_ignore_features: List[str] = field(default_factory=list) + + +@dataclass +class APISettings: + """Container for all API settings.""" + + model_store: ModelStoreSettings + prediction: PredictionSettings + + +def _resolve_path(base_dir: Path, value: Optional[str]) -> Optional[Path]: + """Resolve ``value`` relative to ``base_dir`` when it is not absolute.""" + + if value in (None, ""): + return None + + path = Path(value) + if not path.is_absolute(): + path = (base_dir / path).resolve() + return path + + +def _load_yaml(config_path: Path) -> Dict[str, object]: + with open(config_path, "r", encoding="utf-8") as stream: + data = yaml.safe_load(stream) or {} + if not isinstance(data, dict): + raise ValueError("The service configuration must be a YAML mapping/dictionary.") + return data + + +@lru_cache() +def get_settings() -> APISettings: + """Return the API settings, loading them from disk on first access.""" + + configured_path = os.environ.get(CONFIG_ENV_VAR) + config_path = Path(configured_path) if configured_path else DEFAULT_CONFIG_PATH + if not config_path.exists(): + raise FileNotFoundError( + f"Service configuration file '{config_path}' could not be found." + ) + + config_path = config_path.resolve() + raw_config = _load_yaml(config_path) + base_dir = config_path.parent + + model_store_cfg = raw_config.get("model_store", {}) or {} + prediction_cfg = raw_config.get("prediction", {}) or {} + + model_root = _resolve_path(base_dir, model_store_cfg.get("root_directory", "./models")) + if model_root is None: + raise ValueError("A model store root directory must be provided in the configuration.") + + version_strategy = model_store_cfg.get("default_version_strategy", "latest") + + prediction_config_path = _resolve_path( + base_dir, prediction_cfg.get("config_path", "../base_config.yaml") + ) + if prediction_config_path is None or not prediction_config_path.exists(): + raise FileNotFoundError( + f"Prediction configuration file '{prediction_config_path}' could not be found." + ) + + output_directory = _resolve_path(base_dir, prediction_cfg.get("output_directory")) + + default_asset_name = prediction_cfg.get("default_asset_name", "{model_name}") + default_ignore = prediction_cfg.get("default_ignore_features") or [] + + settings = APISettings( + model_store=ModelStoreSettings( + root_directory=model_root, + default_version_strategy=str(version_strategy), + ), + prediction=PredictionSettings( + config_path=prediction_config_path, + time_column=prediction_cfg.get("time_column"), + status_column=prediction_cfg.get("status_column"), + status_mapping=dict(prediction_cfg.get("status_mapping") or {}), + train_test_column=prediction_cfg.get("train_test_column"), + train_test_mapping=dict(prediction_cfg.get("train_test_mapping") or {}), + min_anomaly_length=int(prediction_cfg.get("min_anomaly_length", 18)), + debug_plots=bool(prediction_cfg.get("debug_plots", False)), + output_directory=output_directory, + default_asset_name=str(default_asset_name), + default_ignore_features=list(default_ignore), + ), + ) + + return settings + + +__all__ = [ + "APISettings", + "ModelStoreSettings", + "PredictionSettings", + "CONFIG_ENV_VAR", + "DEFAULT_CONFIG_PATH", + "get_settings", +] diff --git a/energy_fault_detector/base_config.yaml b/energy_fault_detector/base_config.yaml index f708a7e..e5fb253 100644 --- a/energy_fault_detector/base_config.yaml +++ b/energy_fault_detector/base_config.yaml @@ -44,5 +44,13 @@ train: root_cause_analysis: alpha: 0.5 init_x_bias: recon - num_iter: 1000 - verbose: true \ No newline at end of file + num_iter: 900 + verbose: true + ignore_features: + # Patterns apply to anomaly scoring and ARCANA. Wildcards such as "windspeed*" are supported. + # list exact column names or use wildcards such as "windspeed*" + - wind_speed_59_avg + - power_58_avg + - wind_speed_60_avg + - wind_speed_61_avg + - power_62_avg \ No newline at end of file diff --git a/energy_fault_detector/config/config.py b/energy_fault_detector/config/config.py index 9722615..15df690 100644 --- a/energy_fault_detector/config/config.py +++ b/energy_fault_detector/config/config.py @@ -76,6 +76,11 @@ 'num_iter': {'type': 'integer', 'required': False}, 'epsilon': {'type': 'float', 'required': False}, 'verbose': {'type': 'boolean', 'required': False}, + 'ignore_features': { + 'type': 'list', + 'required': False, + 'schema': {'type': 'string'}, + }, } PREDICT_SCHEMA = { diff --git a/energy_fault_detector/core/fault_detection_model.py b/energy_fault_detector/core/fault_detection_model.py index b4ee8a3..2933dd7 100644 --- a/energy_fault_detector/core/fault_detection_model.py +++ b/energy_fault_detector/core/fault_detection_model.py @@ -209,9 +209,22 @@ def load_models(self, model_path: str, load_threshold: bool = True) -> None: model_directory=os.path.join(model_path, SCORE_DIR) ) # for backwards compatibility - check whether config was saved: - if os.path.exists(os.path.join(model_path, 'config.yaml')): - self.config = Config(os.path.join(model_path, 'config.yaml')) - self._model_factory = ModelFactory(self.config) + config_path = os.path.join(model_path, 'config.yaml') + if os.path.exists(config_path): + loaded_config = Config(config_path) + if self.config is None: + self.config = loaded_config + else: + new_config_dict = loaded_config.config_dict.copy() + existing_rca = self.config.config_dict.get('root_cause_analysis', {}) + if existing_rca: + new_rca = new_config_dict.setdefault('root_cause_analysis', {}) + if 'ignore_features' not in new_rca and 'ignore_features' in existing_rca: + new_rca['ignore_features'] = existing_rca['ignore_features'] + self.config.update_config(new_config_dict) + self.config.configuration_file = config_path + if self._model_factory is None: + self._model_factory = ModelFactory(self.config) @staticmethod def _load_pickled_model(model_type: str, model_directory: str): diff --git a/energy_fault_detector/core/fault_detection_result.py b/energy_fault_detector/core/fault_detection_result.py index 716a1ad..df08e81 100644 --- a/energy_fault_detector/core/fault_detection_result.py +++ b/energy_fault_detector/core/fault_detection_result.py @@ -12,7 +12,7 @@ class FaultDetectionResult: """Class to encapsulate results from the fault detection process.""" predicted_anomalies: pd.DataFrame - """DataFrame with a column 'anomaly' (bool).""" + """DataFrame containing anomaly indicators and metadata per timestamp.""" reconstruction: pd.DataFrame """DataFrame with reconstruction of the sensor data with timestamp as index.""" diff --git a/energy_fault_detector/data_preprocessing/column_selector.py b/energy_fault_detector/data_preprocessing/column_selector.py index 3274cbf..b897678 100644 --- a/energy_fault_detector/data_preprocessing/column_selector.py +++ b/energy_fault_detector/data_preprocessing/column_selector.py @@ -69,8 +69,13 @@ def transform(self, x: pd.DataFrame) -> pd.DataFrame: # transformation is not possible. missing_columns = set(self.feature_names_out_) - set(x.columns) if len(missing_columns) > 0: - raise ValueError('ColumnSelector: There are columns missing in the prediction data, which were present in' - ' the training data. New models need to be trained!') + missing_columns_sorted = sorted(missing_columns) + missing_columns_str = ', '.join(missing_columns_sorted) + raise ValueError( + 'ColumnSelector: There are columns missing in the prediction data, which were present in' + ' the training data. Missing columns: ' + f"{missing_columns_str}. New models need to be trained!" + ) x = x[self.feature_names_out_] # ensure ordering return x diff --git a/energy_fault_detector/fault_detector.py b/energy_fault_detector/fault_detector.py index 207f2e8..2fd626e 100644 --- a/energy_fault_detector/fault_detector.py +++ b/energy_fault_detector/fault_detector.py @@ -1,7 +1,7 @@ """Main fault detection class""" import logging -from typing import Optional, Any, Tuple, List +from typing import Optional, Any, Tuple, List, Iterable, Dict from datetime import datetime import os @@ -17,6 +17,7 @@ from energy_fault_detector.config import Config from energy_fault_detector._logs import setup_logging from energy_fault_detector.core.fault_detection_result import FaultDetectionResult, ModelMetadata +from energy_fault_detector.utils.feature_filters import mask_ignored_features setup_logging(os.path.join(os.path.dirname(__file__), 'logging.yaml')) logger = logging.getLogger('energy_fault_detector') @@ -89,7 +90,7 @@ def preprocess_train_data(self, sensor_data: pd.DataFrame, normal_index: pd.Seri def fit(self, sensor_data: pd.DataFrame, normal_index: pd.Series = None, save_models: bool = True, overwrite_models: bool = False, fit_autoencoder_only: bool = False, fit_preprocessor: bool = True, - **kwargs) -> ModelMetadata: + model_name: Optional[str] = None, **kwargs) -> ModelMetadata: """Fit models on the given sensor_data and save them locally and return the metadata. Args: @@ -104,6 +105,8 @@ def fit(self, sensor_data: pd.DataFrame, normal_index: pd.Series = None, save_mo fit_autoencoder_only (bool, optional): If True, only fit the data preprocessor and autoencoder objects. Defaults to False. fit_preprocessor (bool, optional): If True, the preprocessor is fitted. Defaults to True. + model_name (Optional[str], optional): Custom directory name for the saved model files. Defaults to + 'trained_model'. Returns: ModelMetadata: metadata of the trained model: model_date, model_path, model reconstruction errors @@ -128,7 +131,8 @@ def fit(self, sensor_data: pd.DataFrame, normal_index: pd.Series = None, save_mo # save the models if save_models: - model_path, model_date = self.save_models(model_name='trained_model', overwrite=overwrite_models) + save_dir_name = model_name or 'trained_model' + model_path, model_date = self.save_models(model_name=save_dir_name, overwrite=overwrite_models) else: model_date = datetime.now().strftime("%Y%m%d_%H%M%S") return ModelMetadata( @@ -225,7 +229,6 @@ def tune(self, sensor_data: pd.DataFrame, normal_index: pd.Series, else: model_date = datetime.now().strftime("%Y%m%d_%H%M%S") return ModelMetadata( - model_path=model_path, model_date=model_date, train_recon_error=train_recon_error, val_recon_error=val_recon_error @@ -233,7 +236,8 @@ def tune(self, sensor_data: pd.DataFrame, normal_index: pd.Series, def predict(self, sensor_data: pd.DataFrame, model_path: Optional[str] = None, root_cause_analysis: bool = False, track_losses: bool = False, - track_bias: bool = False) -> FaultDetectionResult: + track_bias: bool = False, ignore_features: Optional[Iterable[str]] = None + ) -> FaultDetectionResult: """Predict with given models for a specific asset Args: @@ -244,6 +248,8 @@ def predict(self, sensor_data: pd.DataFrame, model_path: Optional[str] = None, root_cause_analysis (bool, optional): Whether to run ARCANA. Defaults to False. track_losses (bool, optional): Optional; if True, ARCANA losses will be tracked over the iterations. Defaults to False. track_bias (bool, optional): Optional; if True, ARCANA bias will be tracked over the iterations. Defaults to False. + ignore_features (Optional[Iterable[str]], optional): Iterable of patterns identifying features that should be + excluded from anomaly scoring and root cause analysis. Overrides the configuration defaults when provided. Returns: FaultDetectionResult: with the following DataFrames: @@ -266,26 +272,78 @@ def predict(self, sensor_data: pd.DataFrame, model_path: Optional[str] = None, x_prepped = self.data_preprocessor.transform(x).sort_index() column_order = x_prepped.columns + print("ignore_features before predict:", self._ignore_feature_patterns) if hasattr(self.autoencoder, 'conditional_features'): x_predicted = self.autoencoder.predict(x_prepped, return_conditions=True, verbose=self.config.verbose) x_predicted = x_predicted[column_order] else: x_predicted = self.autoencoder.predict(x_prepped, verbose=self.config.verbose) + configured_ignore_patterns: Optional[Tuple[str, ...]] + + print("ignore_features:", self._ignore_feature_patterns) + if ignore_features is not None: + configured_ignore_patterns = tuple(ignore_features) + else: + print("config_patterns:", self._ignore_feature_patterns) + config_patterns = self._ignore_feature_patterns + configured_ignore_patterns = config_patterns or None + + print("configured_ignore_patterns:", configured_ignore_patterns) + recon_error = self.autoencoder.get_reconstruction_error(x_prepped) + recon_error = self._mask_reconstruction_error( + recon_error, + 'anomaly scoring (prediction)', + ignore_feature_patterns=configured_ignore_patterns, + ) # inverse transform predictions so they are comparable to the raw data df_inverse_scaled_pred = self.data_preprocessor.inverse_transform(x_predicted) scores = self.anomaly_score.transform(recon_error) - predicted_anomalies = self.predict_anomalies(scores, x_prepped) - predicted_anomalies = pd.DataFrame(data=predicted_anomalies, columns=['anomaly'], index=scores.index) + + if isinstance(scores, pd.DataFrame): + if scores.shape[1] != 1: + raise ValueError('Anomaly score DataFrame must contain exactly one column.') + score_series = scores.iloc[:, 0] + elif isinstance(scores, pd.Series): + score_series = scores + else: + score_series = pd.Series(scores, index=x_prepped.index) + + predicted_anomalies = self.predict_anomalies(score_series, x_prepped) + predicted_anomalies = pd.DataFrame(data=predicted_anomalies, columns=['anomaly'], index=score_series.index) + + threshold_value = getattr(self.threshold_selector, 'threshold', None) + + error_magnitudes = recon_error.reindex(predicted_anomalies.index).abs() + top_attributes = error_magnitudes.apply( + lambda row: [col for col in row[row > 0].sort_values(ascending=False).index[:3]], axis=1 + ) + top_attribute_strings = top_attributes.apply(lambda attrs: ', '.join(attrs)) + + predicted_anomalies['behaviour'] = predicted_anomalies['anomaly'].map( + {True: 'anamoly', False: 'normal'} + ) + predicted_anomalies['anamoly_score'] = score_series.reindex(predicted_anomalies.index) + predicted_anomalies['threshold_score'] = threshold_value + anomaly_flags = predicted_anomalies['anomaly'].astype(int) + predicted_anomalies['cumulative_anamoly_score'] = ( + anomaly_flags.groupby((anomaly_flags == 0).cumsum()).cumsum() + ) + predicted_anomalies['anamolous fields'] = predicted_anomalies['anomaly'].map( + lambda is_anomaly: '' + ) + anomaly_indices = predicted_anomalies['anomaly'] + predicted_anomalies.loc[anomaly_indices, 'anamolous fields'] = top_attribute_strings.loc[anomaly_indices] if root_cause_analysis: logger.info('Run root cause analysis.') df_arcana_bias, arcana_losses, tracked_bias = self.run_root_cause_analysis(sensor_data=sensor_data, track_losses=track_losses, - track_bias=track_bias) + track_bias=track_bias, + ignore_features=configured_ignore_patterns) else: df_arcana_bias = None arcana_losses = None @@ -306,6 +364,7 @@ def predict_anomaly_score(self, sensor_data: pd.DataFrame) -> pd.Series: x_prepped = self.data_preprocessor.transform(sensor_data) recon_error = self.autoencoder.get_reconstruction_error(x_prepped) + recon_error = self._mask_reconstruction_error(recon_error, 'anomaly scoring (direct score)') scores = self.anomaly_score.transform(recon_error) return scores @@ -320,13 +379,15 @@ def predict_anomalies(self, scores: pd.Series, x_prepped: pd.DataFrame = None) - return predicted_anomalies def run_root_cause_analysis(self, sensor_data: pd.DataFrame, track_losses: bool = False, - track_bias: bool = False) -> Tuple[pd.DataFrame, pd.DataFrame, List[pd.DataFrame]]: + track_bias: bool = False, ignore_features: Optional[Iterable[str]] = None + ) -> Tuple[pd.DataFrame, pd.DataFrame, List[pd.DataFrame]]: """Run ARCANA Args: sensor_data: pandas DataFrame containing the sensor data which should be analyzed. track_losses: optional bool. If True the arcana losses will be tracked over the iterations track_bias: optional bool. If True the arcana bias will be tracked over the iterations + ignore_features: optional iterable of patterns identifying features to exclude from the analysis. Returns: Tuple of (pd.DataFrame, pd.DataFrame, List[pd.DataFrame]) df_arcana_bias: pandas dataframe containing the arcana bias. @@ -335,15 +396,36 @@ def run_root_cause_analysis(self, sensor_data: pd.DataFrame, track_losses: bool """ x_prepped = self.data_preprocessor.transform(sensor_data) - if self.config is None: - # backwards compatibility, old models did not save config, just use default parameters - rca = Arcana(model=self.autoencoder) + # print("Arcana params:",self.config) + # if self.config is None: + # # backwards compatibility, old models did not save config, just use default parameters + # rca = Arcana(model=self.autoencoder) + # else: + # rca = Arcana(model=self.autoencoder, **self.config.arcana_params) + print("Before finding Arcana bias") + arcana_kwargs: Dict[str, Any] = {} + if self.config is not None: + arcana_kwargs.update(self.config.arcana_params) + + configured_ignore_patterns: Tuple[str, ...] + if ignore_features is not None: + configured_ignore_patterns = tuple(ignore_features) else: - rca = Arcana(model=self.autoencoder, **self.config.arcana_params) + configured_ignore_patterns = tuple( + arcana_kwargs.get('ignore_features') or self._ignore_feature_patterns + ) + + if configured_ignore_patterns: + arcana_kwargs['ignore_features'] = list(configured_ignore_patterns) + + rca = Arcana(model=self.autoencoder, **arcana_kwargs) + + print("Arcana args:", arcana_kwargs) df_arcana_bias, arcana_losses, tracked_bias = rca.find_arcana_bias(x=x_prepped, track_losses=track_losses, track_bias=track_bias) + # print("After finding Arcana bias", **arcana_kwargs) return df_arcana_bias, arcana_losses, tracked_bias def _fit_threshold(self, x: pd.DataFrame, y: pd.Series, x_val: pd.DataFrame, fit_on_validation: bool = False @@ -353,8 +435,11 @@ def _fit_threshold(self, x: pd.DataFrame, y: pd.Series, x_val: pd.DataFrame, fit # Fit score object only on normal data (all training + validation data) x_prepped_all = self.data_preprocessor.transform(x) deviations = self.autoencoder.get_reconstruction_error(x_prepped_all) + deviations = self._mask_reconstruction_error(deviations, 'anomaly scoring (training)') y_ = y.loc[deviations.index] - self.anomaly_score.fit(deviations[y_.values]) # use series values for compatibility with a multi-index + normal_mask = y_.fillna(False).astype(bool) + normal_indices = normal_mask.index[normal_mask.to_numpy()] + self.anomaly_score.fit(deviations.loc[normal_indices]) scores = self.anomaly_score.transform(deviations) if fit_on_validation: @@ -365,6 +450,7 @@ def _fit_threshold(self, x: pd.DataFrame, y: pd.Series, x_val: pd.DataFrame, fit x_val_all = x_prepped_all.sort_index().loc[x_val.index.min():] # including known anomalies re_val = self.autoencoder.get_reconstruction_error(x_val_all) + re_val = self._mask_reconstruction_error(re_val, 'anomaly scoring (validation)') scores = self.anomaly_score.transform(re_val) logger.info('Fit threshold.') @@ -375,3 +461,41 @@ def _fit_threshold(self, x: pd.DataFrame, y: pd.Series, x_val: pd.DataFrame, fit normal_index=y.loc[scores.index]) else: self.threshold_selector.fit(x=scores, y=y.loc[scores.index]) + + @property + def _ignore_feature_patterns(self) -> Tuple[str, ...]: + if self.config is None: + return tuple() + print("ignore_features:", self.config.arcana_params) + return tuple(self.config.arcana_params.get('ignore_features', [])) + + def _mask_reconstruction_error( + self, + recon_error: pd.DataFrame, + context: str, + ignore_feature_patterns: Optional[Iterable[str]] = None, + ) -> pd.DataFrame: + """Zero-out ignored feature columns before scoring.""" + if ignore_feature_patterns is None: + patterns = self._ignore_feature_patterns + else: + patterns = tuple(ignore_feature_patterns) + + if not patterns: + return recon_error + + masked, ignored_columns, unmatched = mask_ignored_features(recon_error, patterns) + if ignored_columns: + logger.info( + 'Ignoring %s feature(s) for %s: %s', + len(ignored_columns), + context, + ', '.join(sorted(ignored_columns)) + ) + if unmatched: + logger.warning( + 'Configured features to ignore not found in %s: %s', + context, + ', '.join(sorted(unmatched)) + ) + return masked diff --git a/energy_fault_detector/main.py b/energy_fault_detector/main.py index 3f379dd..d57314d 100644 --- a/energy_fault_detector/main.py +++ b/energy_fault_detector/main.py @@ -92,6 +92,19 @@ def main(): help='Path to a directory where results will be saved.', default='results' ) + parser.add_argument( + '--mode', + type=str, + choices=['train', 'predict'], + default='train', + help="Run in 'train' mode to fit a new model or 'predict' to load an existing one." + ) + parser.add_argument( + '--model_path', + type=str, + help='Path to a previously saved model directory (required for predict mode).', + default=None + ) parser.add_argument( '--c2c_example', action='store_true', @@ -107,6 +120,7 @@ def main(): logger.info(f"Options YAML: {args.options}") logger.info(f"Results Directory: {args.results_dir}") + logger.info(f"Execution Mode: {args.mode}") os.makedirs(args.results_dir, exist_ok=True) options = Options() # Initialize with default values @@ -117,10 +131,13 @@ def main(): print(options) + if args.mode == 'predict' and args.model_path is None: + parser.error('--model_path must be provided when --mode is set to predict.') + # Call the quick_fault_detector function with parsed arguments try: from .quick_fault_detection import quick_fault_detector - prediction_results, event_meta_data = quick_fault_detector( + prediction_results, event_meta_data, _event_details, model_metadata = quick_fault_detector( csv_data_path=args.csv_data_path, csv_test_data_path=options.csv_test_data_path, train_test_column_name=options.train_test_column_name, @@ -135,10 +152,26 @@ def main(): enable_debug_plots=options.enable_debug_plots, min_anomaly_length=options.min_anomaly_length, save_dir=args.results_dir, + mode=args.mode, + model_path=args.model_path, ) logger.info(f'Fault detection completed. Results are saved in {args.results_dir}.') prediction_results.save(args.results_dir) - event_meta_data.to_csv(os.path.join(args.results_dir, 'events.csv'), index=False) + predicted_anomalies_path = os.path.join(args.results_dir, 'predicted_anomalies.csv') + logger.info('Prediction data stored at %s.', predicted_anomalies_path) + events_path = os.path.join(args.results_dir, 'events.csv') + event_meta_data.to_csv(events_path, index=False) + logger.info('Event metadata stored at %s.', events_path) + + anomaly_count = int(prediction_results.predicted_anomalies['anomaly'].sum()) + logger.info('Detected %d anomalous timestamps grouped into %d events.', anomaly_count, + len(event_meta_data)) + + if model_metadata is not None and model_metadata.model_path: + logger.info('Trained model saved to %s.', model_metadata.model_path) + print(f"Model saved to: {model_metadata.model_path}") + elif args.mode == 'predict' and args.model_path is not None: + logger.info('Predictions generated using model at %s.', args.model_path) except Exception as e: logger.error(f'An error occurred: {e}') diff --git a/energy_fault_detector/quick_fault_detection/data_loading.py b/energy_fault_detector/quick_fault_detection/data_loading.py index 63bd462..d78f492 100644 --- a/energy_fault_detector/quick_fault_detection/data_loading.py +++ b/energy_fault_detector/quick_fault_detection/data_loading.py @@ -141,9 +141,10 @@ def load_data(csv_data_path: str, train_test_column_name: Optional[str] = None, split is performed if train_test_column_name is not None. Args: - csv_data_path (str): Path to a csv-file containing tabular data which must contain training data for the - autoencoder. This data can also contain test data for evaluation, but in this case train_test_column and - optionally train_test_mapping must be provided. + csv_data_path (Optional str): Path to a csv-file containing tabular data which must contain training data for + the autoencoder. This data can also contain test data for evaluation, but in this case train_test_column and + optionally train_test_mapping must be provided. When ``None`` the training data is skipped, which is useful + for pure prediction runs with a pre-trained model. train_test_column_name (Optional str): Name of the column which specifies which part of the data in csv_data_path is training data and which is test data. If this column does not contain boolean values or values which can be cast into boolean values, then train_test_mapping must be provided. Default is None. @@ -172,6 +173,7 @@ def load_data(csv_data_path: str, train_test_column_name: Optional[str] = None, # train test data split assumes that True stands for training data and False for prediction data train_test_split = get_boolean_feature(df=df, bool_data_column_name=train_test_column_name, boolean_mapping=train_test_mapping) + df = df.drop(columns=['id'], errors='ignore') sensor_data = get_sensor_data(df=df) normal_index = get_boolean_feature(df=df, bool_data_column_name=status_data_column_name, boolean_mapping=status_mapping) @@ -190,7 +192,7 @@ def load_data(csv_data_path: str, train_test_column_name: Optional[str] = None, return train_data, normal_index, test_data -def load_train_test_data(csv_data_path: str, csv_test_data_path: Optional[str] = None, +def load_train_test_data(csv_data_path: Optional[str], csv_test_data_path: Optional[str] = None, train_test_column_name: Optional[str] = None, train_test_mapping: Optional[dict] = None, time_column_name: Optional[str] = None, status_data_column_name: Optional[str] = None, status_mapping: Optional[dict] = None @@ -200,9 +202,9 @@ def load_train_test_data(csv_data_path: str, csv_test_data_path: Optional[str] = If no test data is provided an exception is raised. Args: - csv_data_path (str): Path to a csv-file containing tabular data which must contain training data for the + csv_data_path (Optional str): Path to a csv-file containing tabular data which must contain training data for the autoencoder. This data can also contain test data for evaluation, but in this case train_test_column and - optionally train_test_mapping must be provided. + optionally train_test_mapping must be provided. When ``None`` only the dedicated prediction data is used. csv_test_data_path (Optional str): Path to a csv file containing test data for evaluation. If test data is provided in both ways (i.e. via csv_test_data_path and in csv_data_path + train_test_column) then both test data sets will be fused into one. Default is None. @@ -225,21 +227,28 @@ def load_train_test_data(csv_data_path: str, csv_test_data_path: Optional[str] = booleans or at least castable to booleans. Default is None. Returns: tuple - train_data (pd.DataFrame): Contains training data for the AnomalyDetector (only numeric values). + train_data (pd.DataFrame): Contains training data for the AnomalyDetector (only numeric values). When no + training dataset is provided, this will be an empty dataframe with the same columns as the prediction data. train_normal_index (pd.Series): Contains boolean information about which rows of train_data are normal and which contain anomalous behavior. test_data (pd.DataFrame): Contains test data for the AnomalyDetector (only numeric values). """ - train_data, train_normal_index, test_data = load_data(csv_data_path=csv_data_path, - time_column_name=time_column_name, - status_data_column_name=status_data_column_name, - status_mapping=status_mapping, - train_test_column_name=train_test_column_name, - train_test_mapping=train_test_mapping) - # normal index for prediction data can be ignored + if csv_data_path is not None: + train_data, train_normal_index, test_data = load_data(csv_data_path=csv_data_path, + time_column_name=time_column_name, + status_data_column_name=status_data_column_name, + status_mapping=status_mapping, + train_test_column_name=train_test_column_name, + train_test_mapping=train_test_mapping) + else: + train_data = pd.DataFrame() + train_normal_index = pd.Series(dtype=bool) + test_data = None + if csv_test_data_path is not None: - separated_test_data, _, _ = load_data(csv_data_path=csv_test_data_path, time_column_name=time_column_name, - status_data_column_name=None) + separated_df = read_csv_file(csv_data_path=csv_test_data_path, time_column_name=time_column_name) + separated_df = separated_df.drop(columns=['id'], errors='ignore') + separated_test_data = get_sensor_data(df=separated_df) else: separated_test_data = None @@ -252,4 +261,9 @@ def load_train_test_data(csv_data_path: str, csv_test_data_path: Optional[str] = test_data = pd.concat([test_data, separated_test_data]) elif separated_test_data is not None: test_data = separated_test_data + if test_data is not None: + test_data = test_data.drop(columns=['id'], errors='ignore') + if train_data.empty and test_data is not None: + train_data = test_data.iloc[0:0] + train_normal_index = pd.Series(dtype=bool, index=train_data.index) return train_data, train_normal_index, test_data diff --git a/energy_fault_detector/quick_fault_detection/output.py b/energy_fault_detector/quick_fault_detection/output.py index d249aab..99d8959 100644 --- a/energy_fault_detector/quick_fault_detection/output.py +++ b/energy_fault_detector/quick_fault_detection/output.py @@ -50,8 +50,15 @@ def generate_output_plots(anomaly_detector: FaultDetector, train_data: pd.DataFr axs[0, 0].axvspan(event_meta_data.iloc[i]['start'], event_meta_data.iloc[i]['end'], alpha=0.1, color='red') axs[0, 1].set_title('Anomaly Score During Training') - viz.plot_score_with_threshold(model=anomaly_detector, data=train_data, normal_index=normal_index, ax=axs[0, 1]) - axs[0, 1].set_yscale('log') + if train_data is not None and not train_data.empty: + viz.plot_score_with_threshold(model=anomaly_detector, data=train_data, normal_index=normal_index, + ax=axs[0, 1]) + axs[0, 1].set_yscale('log') + else: + axs[0, 1].text(0.5, 0.5, 'No training data available.', + ha='center', va='center', fontsize=14, + bbox=dict(boxstyle='round,pad=0.5', facecolor='white', edgecolor='black', linewidth=1.5)) + axs[0, 1].set_axis_off() viz.plot_learning_curve(anomaly_detector, ax=axs[1, 0]) axs[1, 0].set_title('Model Learning curve') diff --git a/energy_fault_detector/quick_fault_detection/quick_fault_detector.py b/energy_fault_detector/quick_fault_detection/quick_fault_detector.py index d226341..6f672e3 100644 --- a/energy_fault_detector/quick_fault_detection/quick_fault_detector.py +++ b/energy_fault_detector/quick_fault_detection/quick_fault_detector.py @@ -2,15 +2,21 @@ import os import logging -from typing import List, Optional, Tuple +from pathlib import Path +from typing import List, Optional, Tuple, Dict, Any +try: + from typing import Literal +except ImportError: # pragma: no cover - for Python<3.8 compatibility + from typing_extensions import Literal import pandas as pd from energy_fault_detector._logs import setup_logging +from energy_fault_detector.config import Config from energy_fault_detector.fault_detector import FaultDetector from energy_fault_detector.utils.analysis import create_events from energy_fault_detector.root_cause_analysis.arcana_utils import calculate_mean_arcana_importances -from energy_fault_detector.core.fault_detection_result import FaultDetectionResult +from energy_fault_detector.core.fault_detection_result import FaultDetectionResult, ModelMetadata from energy_fault_detector.quick_fault_detection.data_loading import load_train_test_data from energy_fault_detector.quick_fault_detection.configuration import select_config @@ -20,15 +26,29 @@ logger = logging.getLogger('energy_fault_detector') -def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = None, +def quick_fault_detector(csv_data_path: Optional[str], csv_test_data_path: Optional[str] = None, train_test_column_name: Optional[str] = None, train_test_mapping: Optional[dict] = None, time_column_name: Optional[str] = None, status_data_column_name: Optional[str] = None, status_mapping: Optional[dict] = None, status_label_confidence_percentage: Optional[float] = 0.95, features_to_exclude: Optional[List[str]] = None, angle_features: Optional[List[str]] = None, automatic_optimization: bool = True, enable_debug_plots: bool = False, - min_anomaly_length: int = 18, save_dir: Optional[str] = None - ) -> Tuple[FaultDetectionResult, pd.DataFrame]: + min_anomaly_length: int = 18, save_dir: Optional[str] = None, + mode: Literal['train', 'predict'] = 'train', + model_path: Optional[str] = None, + model_directory: Optional[str] = None, + model_subdir: Optional[str] = None, + model_name: Optional[str] = None, + asset_name: Optional[str] = None + ) -> Tuple[ + FaultDetectionResult, + pd.DataFrame, + List[Dict[str, Any]], + Optional[ModelMetadata], + ]: + asset_name: Optional[str] = None, + rca_ignore_features: Optional[List[str]] = None + ) -> Tuple[FaultDetectionResult, pd.DataFrame, Optional[ModelMetadata]]: """Analyzes provided data using an autoencoder based approach for identifying anomalies based on a learned normal behavior. Anomalies are then aggregated to events and further analyzed. Runs the entire fault detection module chain in one function call. Sections of this function call are: @@ -41,9 +61,10 @@ def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = 7. Visualization of output Args: - csv_data_path (str): Path to a csv-file containing tabular data which must contain training data for the - autoencoder. This data can also contain test data for evaluation, but in this case train_test_column and - optionally train_test_mapping must be provided. + csv_data_path (Optional[str]): Path to a csv-file containing tabular data which must contain training data for + the autoencoder. When running in prediction mode with a pre-trained model, this can be ``None`` to skip + loading training data. This data can also contain test data for evaluation, but in this case + ``train_test_column`` and optionally ``train_test_mapping`` must be provided. csv_test_data_path (Optional str): Path to a csv file containing test data for evaluation. If test data is provided in both ways (i.e. via csv_test_data_path and in csv_data_path + train_test_column) then both test data sets will be fused into one. Default is None. @@ -81,9 +102,25 @@ def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = the FaultDetector threshold) to define an anomaly event. save_dir (Optional[str]): Directory to save the output plots. If not provided, the plots are not saved. Defaults to None. + mode (Literal['train', 'predict']): Determines whether the detector should train a new model or load an + existing one for prediction. Defaults to 'train'. + model_path (Optional[str]): Path to a previously saved model directory. Required when mode='predict'. + model_directory (Optional[str]): Directory where trained model artifacts should be stored. Defaults to the + FaultDetector default when not provided. + model_subdir (Optional[str]): Optional subdirectory inside ``model_directory`` that should be used when saving + models. If not provided, a timestamp-based folder name is used. + model_name (Optional[str]): Custom directory name for the saved model artifacts. Defaults to 'trained_model'. + asset_name (Optional[str]): Identifier of the asset whose data is being analysed. When running in prediction + mode this name is used to create a dedicated folder inside ``prediction_output``. If omitted the folder name + is inferred from ``csv_test_data_path`` when possible. + rca_ignore_features (Optional[List[str]]): Additional feature names or wildcard patterns that should be ignored + by the root cause analysis module during prediction. Values provided here extend the patterns that are stored + in the persisted model configuration. Returns: - Tuple(FaultDetectionResult, pd.DataFrame): FaultDetectionResult object with the following DataFrames: + Tuple(FaultDetectionResult, pd.DataFrame, List[Dict[str, Any]], Optional[ModelMetadata]): FaultDetectionResult + object with the + following DataFrames: - predicted_anomalies: DataFrame with a column 'anomaly' (bool). - reconstruction: DataFrame with reconstruction of the sensor data with timestamp as index. @@ -93,10 +130,25 @@ def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = - arcana_losses: DataFrame containing recorded values for all losses in ARCANA. None if ARCANA was not run. - tracked_bias: List of DataFrames. None if ARCANA was not run. - and the detected anomaly events as dataframe. + and the detected anomaly events as dataframe. When run in prediction mode, all DataFrames contained in the + ``FaultDetectionResult`` are additionally persisted as CSV files under ``prediction_output/``. The + second return value contains event metadata while the third element of the tuple holds a list of dictionaries + with detailed information for every detected event (sensor data slices and ARCANA summaries). + + When run in training mode, the tuple additionally contains ModelMetadata with information about the saved + model. In prediction mode, the metadata element is None. """ - logger.info('Starting Automated Energy Fault Detection and Identification.') + if mode not in {'train', 'predict'}: + raise ValueError(f"Unsupported mode '{mode}'. Please choose 'train' or 'predict'.") + + if mode == 'predict' and model_path is None: + raise ValueError('`model_path` must be provided when running in predict mode.') + + logger.info('Starting Automated Energy Fault Detection and Identification (mode=%s).', mode) logger.info('Loading Data...') + if csv_data_path is None and csv_test_data_path is None: + raise ValueError('At least one data source must be provided via `csv_data_path` or `csv_test_data_path`.') + train_data, train_normal_index, test_data = load_train_test_data(csv_data_path=csv_data_path, csv_test_data_path=csv_test_data_path, train_test_column_name=train_test_column_name, @@ -104,23 +156,69 @@ def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = time_column_name=time_column_name, status_data_column_name=status_data_column_name, status_mapping=status_mapping) - logger.info('Selecting suitable config...') - config = select_config(train_data=train_data, normal_index=train_normal_index, - status_label_confidence_percentage=status_label_confidence_percentage, - features_to_exclude=features_to_exclude, angles=angle_features, - automatic_optimization=automatic_optimization) - logger.info('Training a Normal behavior model.') - anomaly_detector = FaultDetector(config=config) - anomaly_detector.fit(sensor_data=train_data, normal_index=train_normal_index) + model_metadata: Optional[ModelMetadata] = None + + if mode == 'train': + logger.info('Selecting suitable config...') + config = select_config(train_data=train_data, normal_index=train_normal_index, + status_label_confidence_percentage=status_label_confidence_percentage, + features_to_exclude=features_to_exclude, angles=angle_features, + automatic_optimization=automatic_optimization) + logger.info('Training a Normal behavior model.') + fault_detector_kwargs = {} + if model_directory is not None: + fault_detector_kwargs['model_directory'] = model_directory + if model_subdir is not None: + fault_detector_kwargs['model_subdir'] = model_subdir + anomaly_detector = FaultDetector(config=config, **fault_detector_kwargs) + model_metadata = anomaly_detector.fit(sensor_data=train_data, normal_index=train_normal_index, + model_name=model_name) + if model_metadata.model_path: + logger.info('Saved trained model to %s.', model_metadata.model_path) + else: + logger.info('Model was trained but not saved to disk.') + root_cause_analysis = False + else: + logger.info('Loading pre-trained model from %s.', model_path) + fallback_config_path = Path(__file__).resolve().parent.parent / 'base_config.yaml' + fallback_config = Config(str(fallback_config_path)) + if rca_ignore_features: + rca_config = fallback_config.config_dict.setdefault('root_cause_analysis', {}) + existing_patterns = rca_config.get('ignore_features', []) or [] + merged_patterns = list(dict.fromkeys([*existing_patterns, *rca_ignore_features])) + rca_config['ignore_features'] = merged_patterns + print("Fallback config:",fallback_config) + anomaly_detector = FaultDetector(config=fallback_config) + print("anomaly_detector:",anomaly_detector.config.arcana_params) + anomaly_detector.load_models(model_path=model_path) + print("anomaly_detector after load:",anomaly_detector.config.arcana_params) + root_cause_analysis = True + logger.info('Evaluating Test data based on the learned normal behavior.') - prediction_results = anomaly_detector.predict(sensor_data=test_data, root_cause_analysis=False) - anomalies = prediction_results.predicted_anomalies['anomaly'] + prediction_results = anomaly_detector.predict(sensor_data=test_data, root_cause_analysis=root_cause_analysis) + predicted_anomalies = prediction_results.predicted_anomalies.copy() + anomalies = predicted_anomalies['anomaly'] # Find anomaly events event_meta_data, event_data_list = create_events(sensor_data=test_data, boolean_information=anomalies, min_event_length=min_anomaly_length) - arcana_mean_importance_list = [] - arcana_loss_list = [] + event_ids = list(range(1, len(event_data_list) + 1)) + if not event_meta_data.empty: + event_meta_data = event_meta_data.copy() + event_meta_data.insert(0, 'event_id', event_ids) + event_meta_data['critical_event'] = True + + predicted_anomalies['event_id'] = pd.Series(pd.NA, index=predicted_anomalies.index, dtype='Int64') + predicted_anomalies['critical_event'] = False + + for event_id, event_data in zip(event_ids, event_data_list): + event_index = event_data.index + predicted_anomalies.loc[event_index, 'event_id'] = event_id + predicted_anomalies.loc[event_index, 'critical_event'] = True + + prediction_results.predicted_anomalies = predicted_anomalies + arcana_mean_importance_list: List[pd.Series] = [] + arcana_loss_list: List[Optional[pd.DataFrame]] = [] for i in range(len(event_meta_data)): logger.info(f'Analyzing anomaly events ({i + 1} of {len(event_meta_data)}).') event_data = event_data_list[i] @@ -129,15 +227,35 @@ def quick_fault_detector(csv_data_path: str, csv_test_data_path: Optional[str] = event_data=event_data, track_losses=enable_debug_plots) arcana_mean_importance_list.append(arcana_mean_importances) - if len(arcana_losses) > 0: + if arcana_losses is not None and not arcana_losses.empty: arcana_loss_list.append(arcana_losses) + else: + arcana_loss_list.append(None) logger.info('Generating Output Graphics.') logger.info(output_info) generate_output_plots(anomaly_detector=anomaly_detector, train_data=train_data, normal_index=train_normal_index, test_data=test_data, arcana_losses=arcana_loss_list, arcana_mean_importances=arcana_mean_importance_list, event_meta_data=event_meta_data, save_dir=save_dir) - return prediction_results, event_meta_data + if mode == 'predict': + _save_prediction_results( + prediction_results=prediction_results, + csv_test_data_path=csv_test_data_path, + save_dir=save_dir, + asset_name=asset_name, + ) + + event_analysis: List[Dict[str, Any]] = [] + for event_id, event_data, importances, losses in zip(event_ids, event_data_list, + arcana_mean_importance_list, arcana_loss_list): + event_analysis.append({ + 'event_id': event_id, + 'event_data': event_data, + 'arcana_mean_importances': importances, + 'arcana_losses': losses, + }) + + return prediction_results, event_meta_data, event_analysis, model_metadata def analyze_event(anomaly_detector: FaultDetector, event_data: pd.DataFrame, track_losses: bool @@ -158,3 +276,30 @@ def analyze_event(anomaly_detector: FaultDetector, event_data: pd.DataFrame, tra track_bias=False) importances_mean = calculate_mean_arcana_importances(bias_data=bias) return importances_mean, tracked_losses + + +def _save_prediction_results(prediction_results: FaultDetectionResult, + csv_test_data_path: Optional[str], + save_dir: Optional[str], + asset_name: Optional[str]) -> None: + """Persist prediction artefacts to disk when running in predict mode.""" + + base_directory: Path + if save_dir is not None: + base_directory = Path(save_dir) + elif csv_test_data_path is not None: + base_directory = Path(csv_test_data_path).resolve().parent + else: + base_directory = Path.cwd() + + inferred_asset_name: str + if asset_name: + inferred_asset_name = asset_name + elif csv_test_data_path is not None: + csv_path = Path(csv_test_data_path) + inferred_asset_name = csv_path.resolve().parent.name or csv_path.stem + else: + inferred_asset_name = 'asset' + + output_directory = base_directory / 'prediction_output' / inferred_asset_name + prediction_results.save(str(output_directory)) diff --git a/energy_fault_detector/root_cause_analysis/arcana.py b/energy_fault_detector/root_cause_analysis/arcana.py index 3381beb..7888279 100644 --- a/energy_fault_detector/root_cause_analysis/arcana.py +++ b/energy_fault_detector/root_cause_analysis/arcana.py @@ -2,7 +2,7 @@ import logging -from typing import Tuple, List +from typing import Tuple, List, Optional, Set import numpy as np import pandas as pd @@ -13,6 +13,7 @@ from tensorflow.keras.optimizers import Adam from energy_fault_detector.core import Autoencoder +from energy_fault_detector.utils.feature_filters import resolve_ignored_columns logger = logging.getLogger('energy_fault_detector') @@ -72,7 +73,7 @@ class Arcana: def __init__(self, model: AE_TYPE, learning_rate: float = 0.001, init_x_bias: str = 'recon', alpha: float = 0.8, num_iter: int = 400, epsilon: float = 1e-8, verbose: bool = False, - max_sample_threshold: int = 1000, **kwargs): + max_sample_threshold: int = 1000, ignore_features: Optional[List[str]] = None, **kwargs): self.keras_model: AE_TYPE = model @@ -89,6 +90,9 @@ def __init__(self, model: AE_TYPE, learning_rate: float = 0.001, init_x_bias: st self.num_iter: int = num_iter self.verbose: bool = verbose self.max_sample_threshold = max_sample_threshold + self.ignore_features: Tuple[str, ...] = tuple(ignore_features or []) + self._feature_mask: Optional[tf.Tensor] = None + self._ignored_columns: Set[str] = set() def find_arcana_bias(self, x: pd.DataFrame, track_losses: bool = False, track_bias: bool = False ) -> Tuple[pd.DataFrame, pd.DataFrame, List[pd.DataFrame]]: @@ -111,6 +115,9 @@ def find_arcana_bias(self, x: pd.DataFrame, track_losses: bool = False, track_bi """ feature_names = x.columns + print("Before build feature mask") + self._feature_mask = self._build_feature_mask(feature_names) + print("After build feature mask") timestamps = x.index x = x.values.astype('float32') @@ -120,6 +127,7 @@ def find_arcana_bias(self, x: pd.DataFrame, track_losses: bool = False, track_bi x_bias = self.initialize_x_bias(x) x_bias = tf.Variable(x_bias, dtype=tf.float32) + self._apply_feature_mask(x_bias) tracked_losses = {'Combined Loss': [], 'Reconstruction Loss': [], 'Regularization Loss': [], 'Iteration': []} bias = x_bias.numpy() @@ -127,6 +135,7 @@ def find_arcana_bias(self, x: pd.DataFrame, track_losses: bool = False, track_bi tracked_bias = [bias] for i in range(self.num_iter): x_bias, losses, _ = self.update_x_bias(x, x_bias) + self._apply_feature_mask(x_bias) if i % 50 == 0: loss_1, loss_2, combined_loss = losses[0].numpy(), losses[1].numpy(), losses[2].numpy() if track_losses: @@ -141,11 +150,15 @@ def find_arcana_bias(self, x: pd.DataFrame, track_losses: bool = False, track_bi logger.info('%d Combined Loss: %.2f', i, combined_loss) x_bias = pd.DataFrame(data=x_bias.numpy(), columns=feature_names, index=timestamps) + self._apply_feature_mask_to_dataframe(x_bias) # return x_bias as a pandas DataFrame tracked_losses = pd.DataFrame(tracked_losses) tracked_losses = tracked_losses.set_index('Iteration') tracked_bias_dfs = [pd.DataFrame(data=bias, columns=feature_names, index=timestamps) for bias in tracked_bias] + for bias_df in tracked_bias_dfs: + self._apply_feature_mask_to_dataframe(bias_df) + self._feature_mask = None return x_bias, tracked_losses, tracked_bias_dfs def draw_samples(self, x: np.array) -> np.array: @@ -215,5 +228,58 @@ def update_x_bias(self, x: tf.Variable, x_bias: tf.Variable) -> BIAS_RETURN_TYPE # differentiate w.r.t. x_bias grad = grad_tape.gradient(loss_full, x_bias) + if self._feature_mask is not None: + grad = grad * self._feature_mask self.opt.apply_gradients(zip([grad], [x_bias])) + if self._feature_mask is not None: + x_bias.assign(x_bias * self._feature_mask) return x_bias, (loss_1, loss_2, loss_full), grad + + def _build_feature_mask(self, feature_names: pd.Index) -> Optional[tf.Tensor]: + """Create mask to zero gradients for ignored features.""" + print("Ignore features set?",self.ignore_features) + if not self.ignore_features: + self._ignored_columns = set() + return None + + mask = np.ones((1, len(feature_names)), dtype='float32') + print("Before ignoring columns") + ignored_columns, unmatched = resolve_ignored_columns(feature_names, self.ignore_features) + print("ignored columns are:",ignored_columns) + + if ignored_columns: + logger.info( + 'Ignoring %s feature(s) during ARCANA optimisation: %s', + len(ignored_columns), + ', '.join(sorted(ignored_columns)) + ) + for idx, name in enumerate(feature_names): + if name in ignored_columns: + mask[0, idx] = 0.0 + + if unmatched: + logger.warning( + 'Configured features to ignore not found in input data: %s', + ', '.join(sorted(unmatched)) + ) + + self._ignored_columns = set(ignored_columns) + + if np.all(mask == 1.0): + return None + + return tf.constant(mask, dtype=tf.float32) + + def _apply_feature_mask(self, x_bias: tf.Variable) -> None: + """Ensure ignored features remain unchanged.""" + if self._feature_mask is not None: + x_bias.assign(x_bias * self._feature_mask) + + def _apply_feature_mask_to_dataframe(self, df: pd.DataFrame) -> None: + """Zero-out ignored feature values in a DataFrame result.""" + if not self._ignored_columns: + return + + intersection = self._ignored_columns.intersection(df.columns) + if intersection: + df.loc[:, list(intersection)] = 0.0 diff --git a/energy_fault_detector/threshold_selectors/quantile_threshold.py b/energy_fault_detector/threshold_selectors/quantile_threshold.py index f413e1a..c8e0a3b 100644 --- a/energy_fault_detector/threshold_selectors/quantile_threshold.py +++ b/energy_fault_detector/threshold_selectors/quantile_threshold.py @@ -54,14 +54,52 @@ def fit(self, x: Array1D, y: pd.Series = None) -> 'QuantileThresholdSelector': """ if isinstance(x, pd.Series): - x = x.sort_index().values - if isinstance(y, pd.Series): - y = y.sort_index().values + x_series = x.sort_index() + else: + x_series = pd.Series(np.asarray(x)) + + selected_scores: Union[pd.Series, np.ndarray] if y is not None: - x_ = x[y] + if isinstance(y, pd.DataFrame): + if y.shape[1] != 1: + raise ValueError('QuantileThresholdSelector requires a one-dimensional normal index.') + y_series = y.iloc[:, 0] + elif isinstance(y, pd.Series): + y_series = y + else: + y_array = np.asarray(y) + if y_array.ndim != 1: + raise ValueError('QuantileThresholdSelector requires a one-dimensional normal index.') + if y_array.shape[0] != len(x_series): + raise ValueError('Boolean index must be the same length as the anomaly scores.') + selected_scores = x_series[y_array.astype(bool)] + y_series = None + + if y is not None and isinstance(y, (pd.Series, pd.DataFrame)): + y_series = y_series.sort_index() + if isinstance(x, pd.Series): + x_series, y_series = x_series.align(y_series, join='inner') + else: + if not y_series.index.equals(x_series.index): + y_series = pd.Series(y_series.to_numpy(), index=x_series.index) + y_mask = y_series.fillna(False).astype(bool) + if len(y_mask) != len(x_series): + raise ValueError('Boolean index must be the same length as the anomaly scores.') + selected_scores = x_series[y_mask] + + if isinstance(selected_scores, pd.Series) and selected_scores.empty: + selected_scores = x_series else: - x_ = x + selected_scores = x_series + + if isinstance(selected_scores, pd.Series): + x_ = selected_scores.to_numpy(dtype=np.float64, copy=False) + else: + x_ = np.asarray(selected_scores, dtype=np.float64) + + if x_.size == 0: + x_ = x_series.to_numpy(dtype=np.float64, copy=False) self.threshold = float(np.quantile(x_, self.quantile)) diff --git a/energy_fault_detector/utils/analysis.py b/energy_fault_detector/utils/analysis.py index c45a44d..52b2cb1 100644 --- a/energy_fault_detector/utils/analysis.py +++ b/energy_fault_detector/utils/analysis.py @@ -22,6 +22,25 @@ def create_events(sensor_data: pd.DataFrame, boolean_information: pd.Series, - event_meta_data (pd.DataFrame): A DataFrame with columns 'start', 'end', and 'duration' for each event. - event_data (List[pd.DataFrame]): A list of DataFrames corresponding to the sensor data during the defined events. """ + # Ensure the boolean information uses the same index as the sensor data. When running predictions on + # different assets we encountered cases where the boolean series used a different index. Pandas silently + # reindexes boolean masks during ``DataFrame.__getitem__`` which, in this case, resulted in out of bounds + # indices and raised an ``IndexError``. Aligning the series to the sensor data index avoids the + # misalignment and guarantees a pure boolean mask of equal length. + + + # + # print("boolean_information index:",boolean_information,boolean_information.index) + # print("min_event_length:",min_event_length) + # print("sensor_data index:",sensor_data,sensor_data.index) + sensor_data = sensor_data.groupby(level=0).mean() + if boolean_information.index.has_duplicates: + # ``Series.reindex`` does not support duplicate indices. Duplicate timestamps can appear when + # predictions are generated from concatenated batches. In this context we treat any duplicate + # timestamp as an event whenever at least one of the values is ``True``. + boolean_information = boolean_information.groupby(level=0).max() + boolean_information = boolean_information.reindex(sensor_data.index, fill_value=False) + # Create a boolean mask for consecutive True values mask = (boolean_information != boolean_information.shift()).cumsum() diff --git a/energy_fault_detector/utils/feature_filters.py b/energy_fault_detector/utils/feature_filters.py new file mode 100644 index 0000000..8ee17de --- /dev/null +++ b/energy_fault_detector/utils/feature_filters.py @@ -0,0 +1,44 @@ +"""Utilities for handling feature exclusion patterns.""" + +from fnmatch import fnmatch +from typing import Iterable, Set, Tuple + +import pandas as pd + + +def resolve_ignored_columns(columns: Iterable[str], patterns: Iterable[str]) -> Tuple[Set[str], Set[str]]: + """Resolve wildcard patterns against column names. + + Matching is case-insensitive to better support configuration values that + might not exactly match the column name casing. This avoids situations + where an ignored feature listed in the configuration (for example + ``power_58_avg``) would still appear in ARCANA results because the actual + column name differs only by letter case (such as ``Power_58_Avg``). + """ + + matched_columns: Set[str] = set() + matched_patterns: Set[str] = set() + + normalised_patterns = [(pattern, pattern.lower()) for pattern in patterns] + + for name in columns: + lower_name = name.lower() + for pattern, pattern_lower in normalised_patterns: + if fnmatch(name, pattern) or fnmatch(lower_name, pattern_lower): + matched_columns.add(name) + matched_patterns.add(pattern) + break + + unmatched_patterns = set(patterns) - matched_patterns + return matched_columns, unmatched_patterns + + +def mask_ignored_features(df: pd.DataFrame, patterns: Iterable[str]) -> Tuple[pd.DataFrame, Set[str], Set[str]]: + """Return a copy of *df* with ignored columns set to zero.""" + ignored_columns, unmatched_patterns = resolve_ignored_columns(df.columns, patterns) + if not ignored_columns: + return df, ignored_columns, unmatched_patterns + + masked = df.copy() + masked.loc[:, list(ignored_columns)] = 0.0 + return masked, ignored_columns, unmatched_patterns diff --git a/energy_fault_detector/utils/unsupervised_behavior_model.py b/energy_fault_detector/utils/unsupervised_behavior_model.py new file mode 100644 index 0000000..64055a3 --- /dev/null +++ b/energy_fault_detector/utils/unsupervised_behavior_model.py @@ -0,0 +1,328 @@ +"""Unsupervised behaviour modelling for wind turbine sensor data. + +This module provides a lightweight, fully unsupervised workflow that can be +used when only raw sensor logs of a single wind turbine are available. The +``UnsupervisedBehaviorModel`` learns robust per-sensor operating ranges and a +multivariate anomaly detector (Isolation Forest) that jointly flag +observations deviating from historic patterns. It can be reused for any wind +farm by passing a :class:`pandas.DataFrame` with numerical sensor columns. + +Example +------- +>>> import pandas as pd +>>> from energy_fault_detector.utils.unsupervised_behavior_model import ( +... UnsupervisedBehaviorModel, +... ) +>>> # sensor logs of a single turbine (no labels required) +>>> df = pd.DataFrame({"power": [1.1, 1.2, 5.5], "wind_speed": [7.5, 7.4, 25]}) +>>> model = UnsupervisedBehaviorModel().fit(df) +>>> model.get_normal_ranges() + lower median upper +power ... ... ... +wind_speed ... ... ... +>>> predictions = model.predict(df) +>>> predictions["is_anomaly"].tolist() +[False, False, True] + +The implementation purposely favours transparency over complex modelling so +that domain experts can inspect the inferred ranges and understand why a +record has been labelled anomalous. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from os import PathLike +from typing import Dict, List, Optional, Sequence, Tuple + +import numpy as np +import pandas as pd +from sklearn.ensemble import IsolationForest +from sklearn.preprocessing import RobustScaler + + +@dataclass(frozen=True) +class SensorRange: + """Container for the inferred normal operating range of a sensor.""" + + lower: float + upper: float + + def contains(self, values: pd.Series) -> pd.Series: + """Return a boolean series indicating which values fall inside the range.""" + + return (values >= self.lower) & (values <= self.upper) + + +class UnsupervisedBehaviorModel: + """Identify anomalous sensor behaviour without labelled data. + + The model combines per-sensor robust statistics with an Isolation Forest to + flag outliers. Per-sensor ranges are derived from empirical quantiles and an + inter-quantile margin so the approach remains interpretable. The Isolation + Forest captures multivariate anomalies that may not breach individual + thresholds but still indicate abnormal behaviour when considering the + sensors jointly. + + Parameters + ---------- + contamination: + Expected fraction of anomalies in the historic data. This is forwarded + to the Isolation Forest. + quantile_bounds: + Low/high quantiles that represent typical behaviour. A Tukey-style + whisker is added on top of this interval to soften the range. + random_state: + Optional seed for deterministic Isolation Forest behaviour. + """ + + def __init__( + self, + contamination: float = 0.05, + quantile_bounds: Tuple[float, float] = (0.01, 0.99), + random_state: Optional[int] = 42, + ) -> None: + if not 0 < contamination < 0.5: + raise ValueError("`contamination` must be between 0 and 0.5.") + + low_q, high_q = quantile_bounds + if not 0.0 < low_q < high_q < 1.0: + raise ValueError( + "`quantile_bounds` must satisfy 0 < low_quantile < high_quantile < 1." + ) + + self.contamination: float = contamination + self.quantile_bounds: Tuple[float, float] = (low_q, high_q) + self.random_state: Optional[int] = random_state + + self._scaler: Optional[RobustScaler] = None + self._model: Optional[IsolationForest] = None + self._feature_names: Optional[pd.Index] = None + self._normal_ranges: Dict[str, SensorRange] = {} + self._feature_medians: Optional[pd.Series] = None + + @staticmethod + def _select_numeric_features(data: pd.DataFrame) -> pd.DataFrame: + """Return only numeric columns, coercing boolean values to integers.""" + + numeric = data.select_dtypes(include=[np.number, "bool"]).copy() + for column in numeric.columns: + if numeric[column].dtype == bool: + numeric[column] = numeric[column].astype(float) + return numeric.apply(pd.to_numeric, errors="coerce") + + def fit(self, data: pd.DataFrame) -> "UnsupervisedBehaviorModel": + """Learn per-sensor ranges and fit the Isolation Forest. + + Parameters + ---------- + data: + A DataFrame containing historic sensor readings for a single wind + turbine. Only numeric columns are used. Missing values are imputed + with the column medians. + """ + + numeric_data = self._select_numeric_features(data) + if numeric_data.empty: + raise ValueError("`data` must contain at least one numeric column.") + + self._feature_names = numeric_data.columns + self._feature_medians = numeric_data.median() + + self._normal_ranges = {} + low_q, high_q = self.quantile_bounds + for column in self._feature_names: + series = numeric_data[column].dropna() + if series.empty: + # Degenerate column, fall back to the median and allow a narrow band. + median_value = self._feature_medians[column] + margin = max(abs(median_value) * 0.05, 1e-9) + self._normal_ranges[column] = SensorRange( + lower=median_value - margin, + upper=median_value + margin, + ) + continue + + low = float(series.quantile(low_q)) + high = float(series.quantile(high_q)) + iqr = high - low + if np.isclose(iqr, 0.0): + std = float(series.std(ddof=0)) + margin = 3.0 * std if not np.isclose(std, 0.0) else max( + abs(series.iloc[0]) * 0.05, + 1e-9, + ) + else: + margin = 1.5 * iqr + + self._normal_ranges[column] = SensorRange(lower=low - margin, upper=high + margin) + + filled_data = numeric_data.fillna(self._feature_medians) + self._scaler = RobustScaler().fit(filled_data) + scaled = self._scaler.transform(filled_data) + + self._model = IsolationForest( + contamination=self.contamination, + random_state=self.random_state, + ) + self._model.fit(scaled) + + return self + + @property + def feature_names(self) -> pd.Index: + if self._feature_names is None: + raise RuntimeError("The model has not been fitted yet.") + return self._feature_names + + def get_normal_ranges(self) -> pd.DataFrame: + """Return the learnt normal operating ranges for each sensor.""" + + if self._feature_medians is None: + raise RuntimeError("The model has not been fitted yet.") + + records = [] + for name in self.feature_names: + sensor_range = self._normal_ranges[name] + records.append( + { + "sensor": name, + "lower": sensor_range.lower, + "median": self._feature_medians[name], + "upper": sensor_range.upper, + } + ) + return pd.DataFrame.from_records(records).set_index("sensor") + + def _prepare_prediction_frame(self, data: pd.DataFrame) -> pd.DataFrame: + if self._feature_medians is None: + raise RuntimeError("The model has not been fitted yet.") + + numeric_data = self._select_numeric_features(data) + missing_columns = [col for col in self.feature_names if col not in numeric_data] + for column in missing_columns: + numeric_data[column] = self._feature_medians[column] + numeric_data = numeric_data[self.feature_names] + return numeric_data.fillna(self._feature_medians) + + def predict(self, data: pd.DataFrame) -> pd.DataFrame: + """Label observations as normal or anomalous. + + Parameters + ---------- + data: + DataFrame with the same sensor columns that were used during + :meth:`fit`. Additional columns are ignored. The returned DataFrame + contains a boolean ``is_anomaly`` column together with diagnostic + information. + """ + + if self._model is None or self._scaler is None: + raise RuntimeError("The model has not been fitted yet.") + + numeric_data = self._prepare_prediction_frame(data) + scaled = self._scaler.transform(numeric_data) + + anomaly_scores = -self._model.score_samples(scaled) + isolation_flags = self._model.predict(scaled) == -1 + + # Range violations per sensor. + range_flags = {} + for column in self.feature_names: + sensor_range = self._normal_ranges[column] + range_flags[column] = ~sensor_range.contains(numeric_data[column]) + range_violation_df = pd.DataFrame(range_flags) + violation_counts = range_violation_df.sum(axis=1) + + def violated_columns(row: pd.Series) -> List[str]: + return row.index[row].tolist() + + violations = range_violation_df.apply(violated_columns, axis=1) + + result = pd.DataFrame(index=data.index) + result["anomaly_score"] = anomaly_scores + result["isolation_forest_flag"] = isolation_flags + result["range_violation_count"] = violation_counts + result["violated_sensors"] = violations + result["is_anomaly"] = isolation_flags | (violation_counts > 0) + + return result + + def explain(self, data: pd.DataFrame) -> pd.DataFrame: + """Return prediction results joined with the original sensor values. + + This helper simplifies reporting by concatenating ``predict`` output and + the respective sensor readings for quick inspection. + """ + + prediction = self.predict(data) + numeric_data = self._prepare_prediction_frame(data) + return pd.concat([numeric_data, prediction], axis=1) + + +def aggregate_wind_farm_data( + dataset_root: str | PathLike[str], + wind_farm: str = "B", + statistics: Optional[Sequence[str]] = None, + index_column: str = "id", +) -> pd.DataFrame: + """Aggregate Care-to-Compare data for a specific wind farm. + + The Care-to-Compare (C2C) dataset is organised per anomaly event. For + unsupervised modelling it is convenient to concatenate all prediction + segments into a single DataFrame. The function keeps only sensor columns and + drops the metadata columns. Normal/anomalous status labels are intentionally + ignored to emulate a purely unsupervised scenario. + + Parameters + ---------- + dataset_root: + Root path that contains the extracted C2C dataset. + wind_farm: + Select which wind farm ("A", "B" or "C") to load. + statistics: + Optional statistics suffixes passed to :class:`Care2CompareDataset` to + control which sensor aggregates are loaded. + index_column: + Column used as index when reading the CSV files ("id" or "time_stamp"). + """ + + from energy_fault_detector.evaluation.care2compare import Care2CompareDataset + + dataset = Care2CompareDataset(dataset_root) + frames: List[pd.DataFrame] = [] + if statistics is None: + stat_list: Optional[List[str]] = None + elif isinstance(statistics, str): + stat_list = [statistics] + else: + stat_list = list(statistics) + + for (event_data, _event_id) in dataset.iter_datasets( + wind_farm=wind_farm, test_only=True, statistics=stat_list, + index_column=index_column, + ): + sensor_data = event_data.drop( + columns=[ + column + for column in ["asset_id", "id", "time_stamp", "status_type_id", "train_test"] + if column in event_data.columns + ], + errors="ignore", + ) + frames.append(sensor_data) + + if not frames: + raise ValueError( + "No datasets were found. Verify that `dataset_root` points to the extracted Care-to-Compare data." + ) + + return pd.concat(frames, axis=0) + + +__all__ = [ + "SensorRange", + "UnsupervisedBehaviorModel", + "aggregate_wind_farm_data", +] + diff --git a/pre-process.py b/pre-process.py new file mode 100644 index 0000000..d2b232c --- /dev/null +++ b/pre-process.py @@ -0,0 +1,89 @@ +import pandas as pd +import numpy as np + +def analyze_dataframe( + df: pd.DataFrame, + required_cols=None, + max_nan_frac_per_col=0.05, + min_unique_value_count=2, + max_col_zero_frac=0.99, + duplicate_value_to_nan=True, + n_max_duplicates=144, + value_to_replace=0, + imputer_strategy="mean" +): + """ + Analyze a DataFrame for preprocessing issues (without modifying it). + + Args: + df (pd.DataFrame): Input dataframe. + required_cols (list, optional): Fixed required columns. + max_nan_frac_per_col (float): Max allowed NaN fraction per column. + min_unique_value_count (int): Minimum unique values required in a column. + max_col_zero_frac (float): Max allowed fraction of zeros in numeric columns. + duplicate_value_to_nan (bool): Whether to check for long duplicate sequences. + n_max_duplicates (int): Max allowed consecutive duplicate values. + value_to_replace (int/float): Value considered for duplicate sequences. + imputer_strategy (str): Default imputation strategy for numeric NaNs. + + Returns: + dict: Report containing issues and suggestions. + """ + + if required_cols is None: + required_cols = [ + "time_stamp", "asset_id", "train_test", "train_test_bool", + "status_type_id", "status_type_bool" + ] + + # Retain only required + *_avg columns + cols_to_keep = required_cols + [c for c in df.columns if c.endswith("_avg")] + df = df[cols_to_keep] + + report = {} + + # 1. NaN fraction check + nan_fractions = df.isna().mean() + nan_violations = nan_fractions[nan_fractions > max_nan_frac_per_col] + report["high_nan_columns"] = nan_violations.to_dict() + + # 2. Unique value check + low_unique = {} + for col in df.columns: + uniq_count = df[col].nunique(dropna=True) + if uniq_count < min_unique_value_count: + low_unique[col] = uniq_count + report["low_unique_columns"] = low_unique + + # 3. Zero fraction check + zero_fractions = (df == 0).mean(numeric_only=True) + zero_violations = zero_fractions[zero_fractions > max_col_zero_frac] + report["high_zero_columns"] = zero_violations.to_dict() + + # 4. Duplicate sequence check + duplicate_report = {} + if duplicate_value_to_nan: + for col in df.columns: + if pd.api.types.is_numeric_dtype(df[col]): + consecutive_dupes = ( + (df[col] == value_to_replace) + .astype(int) + .groupby(df[col].ne(value_to_replace).cumsum()) + .cumsum() + ) + long_dupes = df[col][consecutive_dupes > n_max_duplicates] + if not long_dupes.empty: + duplicate_report[col] = long_dupes.index.tolist() + report["long_duplicate_sequences"] = duplicate_report + + # 5. Imputation strategy suggestion + imputer_suggestions = {} + for col in df.columns: + if df[col].isna().any(): + if pd.api.types.is_numeric_dtype(df[col]): + imputer_suggestions[col] = f"{imputer_strategy} imputation" + else: + imputer_suggestions[col] = "most_frequent imputation" + report["imputation_suggestions"] = imputer_suggestions + + return report diff --git a/requirements.txt b/requirements.txt index 7c0686a..eb36231 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,7 @@ pandas>=2.0 cerberus matplotlib>=3.9.0 optuna>=4.0 -chardet>=5.0 \ No newline at end of file +chardet>=5.0 +fastapi>=0.110,<1.0 +uvicorn>=0.23,<0.32 +requests>=2.31,<3.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..6e3a048 --- /dev/null +++ b/run.py @@ -0,0 +1,165 @@ +import pandas as pd +from sklearn.impute import SimpleImputer +from energy_fault_detector.quick_fault_detection import quick_fault_detector +from energy_fault_detector.config import Config +import tempfile +import os + +import pandas as pd +import numpy as np + +def analyze_dataframe( + df: pd.DataFrame, + required_cols=None, + max_nan_frac_per_col=0.05, + min_unique_value_count=2, + max_col_zero_frac=0.99, + duplicate_value_to_nan=True, + n_max_duplicates=144, + value_to_replace=0, + imputer_strategy="mean" +): + """ + Analyze a DataFrame for preprocessing issues (without modifying it). + + Args: + df (pd.DataFrame): Input dataframe. + required_cols (list, optional): Fixed required columns. + max_nan_frac_per_col (float): Max allowed NaN fraction per column. + min_unique_value_count (int): Minimum unique values required in a column. + max_col_zero_frac (float): Max allowed fraction of zeros in numeric columns. + duplicate_value_to_nan (bool): Whether to check for long duplicate sequences. + n_max_duplicates (int): Max allowed consecutive duplicate values. + value_to_replace (int/float): Value considered for duplicate sequences. + imputer_strategy (str): Default imputation strategy for numeric NaNs. + + Returns: + dict: Report containing issues and suggestions. + """ + + if required_cols is None: + required_cols = [ + "time_stamp", "train_test", "train_test_bool", + "status_type_id", "status_type_bool" + ] + + # Retain only required + *_avg columns + cols_to_keep = required_cols + [c for c in df.columns if c.endswith("_avg")] + df = df[cols_to_keep] + + report = {} + + # 1. NaN fraction check + nan_fractions = df.isna().mean() + nan_violations = nan_fractions[nan_fractions > max_nan_frac_per_col] + report["high_nan_columns"] = nan_violations.to_dict() + + # 2. Unique value check + low_unique = {} + for col in df.columns: + uniq_count = df[col].nunique(dropna=True) + if uniq_count < min_unique_value_count: + low_unique[col] = uniq_count + report["low_unique_columns"] = low_unique + + # 3. Zero fraction check + zero_fractions = (df == 0).mean(numeric_only=True) + zero_violations = zero_fractions[zero_fractions > max_col_zero_frac] + report["high_zero_columns"] = zero_violations.to_dict() + + # 4. Duplicate sequence check + duplicate_report = {} + if duplicate_value_to_nan: + for col in df.columns: + if pd.api.types.is_numeric_dtype(df[col]): + consecutive_dupes = ( + (df[col] == value_to_replace) + .astype(int) + .groupby(df[col].ne(value_to_replace).cumsum()) + .cumsum() + ) + long_dupes = df[col][consecutive_dupes > n_max_duplicates] + if not long_dupes.empty: + duplicate_report[col] = long_dupes.index.tolist() + # report["long_duplicate_sequences"] = duplicate_report + + # 5. Imputation strategy suggestion + imputer_suggestions = {} + for col in df.columns: + if df[col].isna().any(): + if pd.api.types.is_numeric_dtype(df[col]): + imputer_suggestions[col] = f"{imputer_strategy} imputation" + else: + imputer_suggestions[col] = "most_frequent imputation" + # report["imputation_suggestions"] = imputer_suggestions + + return df,report + + +farm_path = "/content/drive/MyDrive/Wind Turbine/Care Dataset/CARE_To_Compare/Wind Farm B/7.csv" +farm_path = r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\asset_files\train_0.csv" +test_file_path = r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\asset_files\predict_0.csv" +output_root_path = r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\models" + +# Load the data using pandas +import csv +with open(farm_path, "r", encoding="utf-8", errors="ignore") as f: + sample = f.read(2048) # read a small chunk + sniffer = csv.Sniffer() + try: + dialect = sniffer.sniff(sample) + delimiter = dialect.delimiter + except csv.Error: + delimiter = ";" # fallback default + +# Read CSV with detected delimiter +df = pd.read_csv(farm_path, sep=delimiter, dtype=str) + +# df,report = analyze_dataframe(df) +# df.to_csv(r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\asset_files\train_0_processed.csv", index=False) +# import pprint +# pprint.pprint(report) + +# Identify numeric columns +numeric_cols = df.select_dtypes(include=["number"]).columns +print ("Number columns:",numeric_cols) +df_processed = df[["time_stamp"]].copy() + +if len(numeric_cols) > 0: + df_numeric = df[numeric_cols] + + # Handle missing values using SimpleImputer + imputer = SimpleImputer(strategy="mean") + df_imputed_numeric = pd.DataFrame( + imputer.fit_transform(df_numeric), columns=numeric_cols + ) + + # Re-add non-numeric columns (time_stamp and status_type_id_bool) to the imputed dataframe + df_processed[numeric_cols] = df_imputed_numeric +else: + # If there are no numeric columns, simply keep the original non-numeric data + df_processed = df_processed.join(df.drop(columns=["time_stamp"])) + +# Create a temporary CSV file +with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp_file: + temp_csv_path = tmp_file.name + df_processed.to_csv(temp_csv_path, index=False) + +# Pass the path of the temporary CSV file to the quick_fault_detector +# quick_fault_detector, quick_fault_detector_df = quick_fault_detector(temp_csv_path, None, "train_test_bool", None, "time_stamp", "status_type_bool") + +# from energy_fault_detector.quick_fault_detection import quick_fault_detector + +prediction_results, events, _event_details, metadata = quick_fault_detector( + csv_data_path=temp_csv_path, + csv_test_data_path=test_file_path, + mode="train", + time_column_name="time_stamp", # optional, if you need timestamp parsing + model_directory=output_root_path, # optional; defaults to the package setting + model_subdir="asset_0", # optional; becomes a subfolder under model_directory + model_name="farm_b" # optional; final folder for saved artifacts +) + + +# Optionally, remove the temporary file after use +# os.remove(temp_csv_path) diff --git a/run_predict.py b/run_predict.py new file mode 100644 index 0000000..630f906 --- /dev/null +++ b/run_predict.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python +"""Utility script for running the quick fault detector in predict mode. + +This executable mirrors the high level behaviour of :mod:`run.py`, but focuses on +loading a previously trained model and applying it to new data. At a minimum the +user has to provide the path to the model directory and the CSV file that should be +evaluated. Optionally a dedicated training dataset as well as metadata about the +input columns can be supplied. + +Example usage:: + + python run_predict.py \ + --model-path path/to/model_directory \ + --csv-predict-data path/to/predict.csv \ + --time-column time_stamp + +The script prints a short textual summary of the run and leaves the detailed +artefacts (plots, CSVs) to the underlying ``quick_fault_detector`` implementation. +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any, Dict, Optional + +from energy_fault_detector.quick_fault_detection import quick_fault_detector + + +DEFAULT_MODEL_PATH = ( + r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\models" + r"\asset_0\20251008_222632" +) +DEFAULT_PREDICT_DATA_PATH = ( + r"D:\Personal\Ideas\Wind turbine\CARE_To_Compare\CARE_To_Compare\Wind Farm B\asset_files" + r"\predict_6.csv" +) +DEFAULT_TIME_COLUMN = "time_stamp" +DEFAULT_ASSET = "asset_6" + + +def _parse_mapping(value: Optional[str]) -> Optional[Dict[str, Any]]: + """Parse a JSON dictionary supplied via the command line. + + Args: + value: Raw argument string. + + Returns: + Parsed dictionary or ``None`` when ``value`` is ``None``. + + Raises: + argparse.ArgumentTypeError: If the value is not valid JSON or does not + decode to a dictionary. + """ + + if value is None: + return None + + try: + mapping = json.loads(value) + except json.JSONDecodeError as exc: # pragma: no cover - handled at runtime + raise argparse.ArgumentTypeError( + f"Failed to decode mapping '{value}': {exc}" + ) from exc + + if not isinstance(mapping, dict): + raise argparse.ArgumentTypeError("Mapping must decode to a JSON object (dict).") + + return mapping + + +def build_argument_parser() -> argparse.ArgumentParser: + """Create the command line argument parser used by the script.""" + + parser = argparse.ArgumentParser( + description="Run the quick fault detector in predict mode using a saved model.", + ) + parser.add_argument( + "--model-path", + default=DEFAULT_MODEL_PATH, + help=( + "Path to the directory that contains the previously trained model. " + "Defaults to the predefined wind farm model when not supplied." + ), + ) + parser.add_argument( + "--csv-predict-data", + default=DEFAULT_PREDICT_DATA_PATH, + help=( + "CSV file that contains the data points to analyse for anomalies. " + "Defaults to the provided wind farm dataset when not supplied." + ), + ) + parser.add_argument( + "--csv-train-data", + help=( + "Optional CSV file that contains the training data. When omitted, no " + "additional training is performed and the provided model is reused." + ), + ) + parser.add_argument( + "--time-column", + default=DEFAULT_TIME_COLUMN, + help=( + "Name of the timestamp column in the provided CSV files. Defaults to " + "'time_stamp' for the predefined dataset." + ), + ) + parser.add_argument( + "--status-column", + help=( + "Name of the column describing the operating status of the asset." + " Used to identify rows that represent normal behaviour." + ), + ) + parser.add_argument( + "--status-mapping", + type=_parse_mapping, + help="JSON dictionary mapping status values to booleans (e.g. '{\"OK\": true}').", + ) + parser.add_argument( + "--train-test-column", + help=( + "Column indicating whether a row belongs to the training (True) or" + " prediction (False) portion of the dataset." + ), + ) + parser.add_argument( + "--train-test-mapping", + type=_parse_mapping, + help="JSON dictionary mapping train/test column values to booleans.", + ) + parser.add_argument( + "--min-anomaly-length", + type=int, + default=18, + help="Minimal number of consecutive anomalies to form an event (default: 18).", + ) + parser.add_argument( + "--save-dir", + help="Directory where diagnostic output (plots, CSVs) should be written.", + ) + parser.add_argument( + "--debug-plots", + action="store_true", + help="Enable creation of additional debug plots during prediction.", + ) + parser.add_argument( + "--asset-name", + default = DEFAULT_ASSET, + help=( + "Optional identifier for the analysed asset. When provided, prediction CSV files are " + "written to a 'prediction_output/' directory." + ), + ) + return parser + + +def validate_paths(model_path: str, csv_predict_data: str, csv_train_data: Optional[str]) -> None: + """Ensure that the provided filesystem paths point to existing resources.""" + + model_dir = Path(model_path) + if not model_dir.exists(): + raise FileNotFoundError(f"Model directory '{model_dir}' does not exist.") + + predict_path = Path(csv_predict_data) + if not predict_path.is_file(): + raise FileNotFoundError(f"Prediction data file '{predict_path}' was not found.") + + if csv_train_data is not None: + train_path = Path(csv_train_data) + if not train_path.is_file(): + raise FileNotFoundError(f"Training data file '{train_path}' was not found.") + + +def main() -> None: + """Entry point for command line execution.""" + + parser = build_argument_parser() + args = parser.parse_args() + + validate_paths( + model_path=args.model_path, + csv_predict_data=args.csv_predict_data, + csv_train_data=args.csv_train_data, + ) + + train_data_path = args.csv_train_data + + prediction_results, event_metadata, _event_details, _ = quick_fault_detector( + csv_data_path=train_data_path, + csv_test_data_path=args.csv_predict_data, + train_test_column_name=args.train_test_column, + train_test_mapping=args.train_test_mapping, + time_column_name=args.time_column, + status_data_column_name=args.status_column, + status_mapping=args.status_mapping, + min_anomaly_length=args.min_anomaly_length, + save_dir=args.save_dir, + enable_debug_plots=args.debug_plots, + mode="predict", + model_path=args.model_path, + asset_name=args.asset_name, + ) + + predicted_anomalies = prediction_results.predicted_anomalies + + if args.save_dir is not None: + output_dir = Path(args.save_dir) + else: + output_dir = Path(args.csv_predict_data).resolve().parent + + output_dir.mkdir(parents=True, exist_ok=True) + anomaly_details_path = output_dir / f"{Path(args.csv_predict_data).stem}_anomaly_details.csv" + predicted_anomalies.to_csv(anomaly_details_path) + + print("Prediction finished successfully.") + print(f"Number of detected events: {len(event_metadata)}") + print("Anomaly score preview:") + print(prediction_results.anomaly_score.head()) + print(f"Saved anomaly details to: {anomaly_details_path}") + + +if __name__ == "__main__": # pragma: no cover - script entry point + main() + diff --git a/tests/api/test_prediction_api.py b/tests/api/test_prediction_api.py new file mode 100644 index 0000000..8cd57ac --- /dev/null +++ b/tests/api/test_prediction_api.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +from types import SimpleNamespace +from typing import Tuple + +import pandas as pd +import pytest +from fastapi.testclient import TestClient + +from energy_fault_detector.api import prediction_api +from energy_fault_detector.api.prediction_api import ( + DataTypeMismatchError, + EmptyInputError, + InvalidModelError, + PredictionRequest, + PredictionSuccessResponse, + SchemaMismatchError, + TimestampValidationError, + run_prediction, +) +from energy_fault_detector.core.fault_detection_result import FaultDetectionResult + + +def _create_detector(result: FaultDetectionResult, expected_columns: Tuple[str, ...]): + column_selector = SimpleNamespace(feature_names_out_=list(expected_columns)) + imputer = SimpleNamespace(feature_names_in_=list(expected_columns)) + data_preprocessor = SimpleNamespace( + named_steps={"column_selector": column_selector, "imputer": imputer}, + get_feature_names_out=lambda: list(expected_columns), + ) + + class _Detector: + data_preprocessor = data_preprocessor + + @staticmethod + def predict(sensor_data: pd.DataFrame, root_cause_analysis: bool = False): # pylint: disable=unused-argument + return result + + return _Detector() + + +def _sample_fault_detection_result(timestamps: pd.DatetimeIndex) -> FaultDetectionResult: + sensor_values = pd.DataFrame( + { + "sensor_a": [1.0, 2.0, 3.0], + "sensor_b": [0.5, 0.6, 0.7], + }, + index=timestamps, + ) + + predicted_anomalies = pd.DataFrame( + { + "anomaly": [False, True, True], + "behaviour": ["normal", "anamoly", "anamoly"], + "anamoly_score": [0.1, 0.9, 1.1], + "threshold_score": [0.8, 0.8, 0.8], + "cumulative_anamoly_score": [0, 1, 2], + }, + index=timestamps, + ) + + anomaly_score = pd.DataFrame({"value": [0.1, 0.9, 1.1]}, index=timestamps) + + return FaultDetectionResult( + predicted_anomalies=predicted_anomalies, + reconstruction=sensor_values, + recon_error=sensor_values, + anomaly_score=anomaly_score, + bias_data=None, + arcana_losses=None, + tracked_bias=None, + ) + + +def test_run_prediction_success(): + timestamps = pd.to_datetime( + ["2024-01-01T00:00:00Z", "2024-01-01T00:01:00Z", "2024-01-01T00:02:00Z"] + ) + result = _sample_fault_detection_result(timestamps) + + detector = _create_detector(result, ("sensor_a", "sensor_b")) + + request = PredictionRequest( + model_path="dummy", + data=[ + {"time_stamp": ts.isoformat(), "sensor_a": float(idx + 1), "sensor_b": 0.5 + idx * 0.1} + for idx, ts in enumerate(timestamps) + ], + timestamp_column="time_stamp", + min_event_length=1, + ) + + event_data = result.reconstruction.iloc[1:] + event_meta = pd.DataFrame( + { + "start": [event_data.index[0]], + "end": [event_data.index[-1]], + "duration": [event_data.index[-1] - event_data.index[0]], + } + ) + + def events_factory(sensor_data: pd.DataFrame, boolean_information: pd.Series, min_event_length: int): # pylint: disable=unused-argument + return event_meta, [event_data] + + def event_analyzer(detector_obj, data, track_losses): # pylint: disable=unused-argument + importances = pd.Series({"sensor_a": 0.6, "sensor_b": 0.4}) + losses = pd.DataFrame( + {"Combined Loss": [0.2, 0.1], "Reconstruction Loss": [0.15, 0.05], "Regularization Loss": [0.05, 0.05]}, + index=[0, 50], + ) + losses.index.name = "Iteration" + return importances, losses + + response = run_prediction( + request, + detector_loader=lambda _: detector, + events_factory=events_factory, + event_analyzer=event_analyzer, + ) + + assert isinstance(response, PredictionSuccessResponse) + assert response.status == "success" + assert len(response.events) == 1 + assert response.events[0].event_id == 1 + assert len(response.event_sensor_data) == 1 + assert len(response.event_sensor_data[0].points) == len(event_data) + assert response.event_sensor_data[0].arcana_mean_importances == {"sensor_a": 0.6, "sensor_b": 0.4} + + +def test_run_prediction_schema_mismatch(): + timestamps = pd.to_datetime(["2024-01-01T00:00:00Z", "2024-01-01T00:01:00Z"]) + result = _sample_fault_detection_result(timestamps) + detector = _create_detector(result, ("sensor_a", "sensor_b")) + + request = PredictionRequest( + model_path="dummy", + data=[ + {"time_stamp": ts.isoformat(), "sensor_a": float(idx + 1)} + for idx, ts in enumerate(timestamps) + ], + timestamp_column="time_stamp", + min_event_length=1, + ) + + with pytest.raises(SchemaMismatchError): + run_prediction(request, detector_loader=lambda _: detector, events_factory=lambda *args, **kwargs: (pd.DataFrame(), [])) + + +def test_run_prediction_dtype_mismatch(): + timestamps = pd.to_datetime(["2024-01-01T00:00:00Z", "2024-01-01T00:01:00Z"]) + result = _sample_fault_detection_result(timestamps) + detector = _create_detector(result, ("sensor_a", "sensor_b")) + + request = PredictionRequest( + model_path="dummy", + data=[ + {"time_stamp": ts.isoformat(), "sensor_a": float(idx + 1), "sensor_b": "bad"} + for idx, ts in enumerate(timestamps) + ], + timestamp_column="time_stamp", + min_event_length=1, + ) + + with pytest.raises(DataTypeMismatchError): + run_prediction(request, detector_loader=lambda _: detector, events_factory=lambda *args, **kwargs: (pd.DataFrame(), [])) + + +def test_run_prediction_timestamp_validation(): + request = PredictionRequest( + model_path="dummy", + data=[{"sensor_a": 1.0, "sensor_b": 2.0}], + timestamp_column="time_stamp", + min_event_length=1, + ) + + with pytest.raises(TimestampValidationError): + run_prediction(request, detector_loader=lambda _: None, events_factory=lambda *args, **kwargs: (pd.DataFrame(), [])) + + +def test_run_prediction_invalid_model(): + timestamps = pd.to_datetime(["2024-01-01T00:00:00Z"]) + request = PredictionRequest( + model_path="dummy", + data=[{"time_stamp": timestamps[0].isoformat(), "sensor_a": 1.0, "sensor_b": 2.0}], + timestamp_column="time_stamp", + min_event_length=1, + ) + + def failing_loader(_: str): + raise InvalidModelError("model missing") + + with pytest.raises(InvalidModelError): + run_prediction(request, detector_loader=failing_loader, events_factory=lambda *args, **kwargs: (pd.DataFrame(), [])) + + +def test_run_prediction_empty_payload(): + request = PredictionRequest.construct( + model_path="dummy", + data=[], + timestamp_column="time_stamp", + min_event_length=1, + ) + + with pytest.raises(EmptyInputError): + run_prediction(request, detector_loader=lambda _: None, events_factory=lambda *args, **kwargs: (pd.DataFrame(), [])) + + +def test_predict_endpoint_success(monkeypatch): + response_payload = PredictionSuccessResponse(status="success", events=[], event_sensor_data=[]) + monkeypatch.setattr(prediction_api, "run_prediction", lambda req: response_payload) + client = TestClient(prediction_api.app) + + payload = { + "model_path": "dummy", + "data": [{"time_stamp": "2024-01-01T00:00:00Z", "sensor_a": 1.0, "sensor_b": 2.0}], + } + + response = client.post("/predict", json=payload) + assert response.status_code == 200 + assert response.json()["status"] == "success" + + +def test_predict_endpoint_error(monkeypatch): + def failing_run_prediction(req): # pylint: disable=unused-argument + raise SchemaMismatchError("missing columns") + + monkeypatch.setattr(prediction_api, "run_prediction", failing_run_prediction) + client = TestClient(prediction_api.app) + + payload = { + "model_path": "dummy", + "data": [{"time_stamp": "2024-01-01T00:00:00Z", "sensor_a": 1.0}], + } + + response = client.post("/predict", json=payload) + assert response.status_code == 409 + detail = response.json()["detail"] + assert detail["code"] == "EFD_SCHEMA_MISMATCH" + assert detail["status"] == "error" diff --git a/tests/config/test_config.py b/tests/config/test_config.py index bf29cba..21e754d 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -46,7 +46,8 @@ def test_init(self): self.assertDictEqual(conf.config_dict['root_cause_analysis'], {'alpha': 0.8, 'init_x_bias': 'recon', - 'num_iter': 200} + 'num_iter': 200, + 'ignore_features': ['windspeed', 'output_power']} ) def test_arcana_config(self): @@ -54,7 +55,8 @@ def test_arcana_config(self): self.assertTrue(conf.root_cause_analysis) self.assertDictEqual( conf.arcana_params, - {'alpha': 0.8, 'num_iter': 200, 'init_x_bias': 'recon'} + {'alpha': 0.8, 'num_iter': 200, 'init_x_bias': 'recon', + 'ignore_features': ['windspeed', 'output_power']} ) conf = Config(os.path.join(PROJECT_ROOT, './tests/test_data/test_config_no_rca.yaml')) diff --git a/tests/root_cause_analysis/test_arcana.py b/tests/root_cause_analysis/test_arcana.py index 4a25e2b..22ae063 100644 --- a/tests/root_cause_analysis/test_arcana.py +++ b/tests/root_cause_analysis/test_arcana.py @@ -104,3 +104,26 @@ def test_draw_samples(self): selection = arcana.draw_samples(x=self.data) self.assertTupleEqual(self.data[:-1].shape, self.data[selection].shape) + def test_ignore_features(self): + subset = self.data_frame.iloc[:20].copy() + subset.columns = [ + 'windspeed_avg', + 'output_power', + 'temperature', + 'pressure', + 'windspeed_peak', + 'other_feature' + ] + ignore_patterns = ['windspeed*', 'non_existing*'] + arcana = Arcana(model=self.ml_ae, num_iter=5, ignore_features=ignore_patterns) + bias, losses, _ = arcana.find_arcana_bias(subset, track_losses=True) + self.assertTrue((bias[['windspeed_avg', 'windspeed_peak']] == 0).all().all()) + self.assertEqual(arcana._ignored_columns, {'windspeed_avg', 'windspeed_peak'}) + ignore_cols = [self.data_frame.columns[0], 'non_existing_feature'] + arcana = Arcana(model=self.ml_ae, num_iter=5, ignore_features=ignore_cols) + subset = self.data_frame.iloc[:20] + bias, losses, _ = arcana.find_arcana_bias(subset, track_losses=True) + self.assertTrue((bias[ignore_cols[0]] == 0).all()) + # ensure optimisation still runs and logs losses + self.assertFalse(losses.empty) + diff --git a/tests/test_data/test_config.yaml b/tests/test_data/test_config.yaml index ee0f42e..23f0479 100644 --- a/tests/test_data/test_config.yaml +++ b/tests/test_data/test_config.yaml @@ -40,3 +40,6 @@ root_cause_analysis: alpha: 0.8 init_x_bias: recon num_iter: 200 + ignore_features: + - windspeed + - output_power diff --git a/tests/test_fault_detector.py b/tests/test_fault_detector.py index 5588ecd..811a77a 100644 --- a/tests/test_fault_detector.py +++ b/tests/test_fault_detector.py @@ -1,4 +1,5 @@ +import copy import os import unittest from unittest.mock import patch, MagicMock @@ -10,6 +11,7 @@ import numpy as np from energy_fault_detector.fault_detector import FaultDetector, Config +from energy_fault_detector.core.fault_detection_model import FaultDetectionModel mock_autoencoder = MagicMock() mock_data_preprocessor = MagicMock() @@ -99,6 +101,7 @@ def setUp(self) -> None: mock_data_preprocessor.reset_mock(return_value=True, side_effect=True) mock_score.reset_mock(return_value=True, side_effect=True) mock_threshold.reset_mock(return_value=True, side_effect=True) + mock_threshold.threshold = 0.5 def tearDown(self) -> None: # Remove the temporary directory after the test @@ -157,6 +160,27 @@ def test_load_models(self): self.assertEqual(call_args[1]['model_type'], name) self.assertEqual(call_args[1]['model_directory'], os.path.join('path_to_saved_models', name)) + def test_load_models_preserves_ignore_features(self): + fault_detector = self._create_fault_detector(self.conf) + existing_factory = fault_detector._model_factory + existing_ignore = tuple(fault_detector.config.arcana_params.get('ignore_features', [])) + + with tempfile.TemporaryDirectory() as model_dir: + for subdir in ['data_preprocessor', 'autoencoder', 'threshold_selector', 'anomaly_score']: + os.makedirs(os.path.join(model_dir, subdir), exist_ok=True) + + config_without_ignore = copy.deepcopy(self.conf.config_dict) + config_without_ignore.setdefault('root_cause_analysis', {}).pop('ignore_features', None) + temp_config = Config(config_dict=config_without_ignore) + temp_config.write_config(os.path.join(model_dir, 'config.yaml'), overwrite=True) + + load_side_effects = ['prep', 'ae', 'threshold', 'score'] + with patch.object(FaultDetectionModel, '_load_pickled_model', side_effect=load_side_effects): + fault_detector.load_models(model_path=model_dir) + + self.assertIs(fault_detector._model_factory, existing_factory) + self.assertEqual(tuple(fault_detector.config.arcana_params.get('ignore_features', [])), existing_ignore) + def test_train(self): self.conf.write_config = MagicMock() fault_detector = self._create_fault_detector(self.conf) @@ -243,8 +267,16 @@ def test_predict(self, mock_find_arcana_bias): # expected results recon = self.predictions recon.index = [0, 1, 2] - anomalies = pd.DataFrame(data=[[False], [False], [True]], - columns=['anomaly']) + anomalies = pd.DataFrame( + data={ + 'anomaly': [False, False, True], + 'behaviour': ['normal', 'normal', 'anamoly'], + 'anamoly_score': [0.1, 0.2, 0.15], + 'threshold_score': [0.5, 0.5, 0.5], + 'cumulative_anamoly_score': [0, 0, 1], + 'anamolous fields': ['', '', 'c'], + } + ) pd.testing.assert_frame_equal(results.reconstruction, recon) pd.testing.assert_frame_equal(results.predicted_anomalies, anomalies) diff --git a/tests/test_model_registry.py b/tests/test_model_registry.py new file mode 100644 index 0000000..8e54513 --- /dev/null +++ b/tests/test_model_registry.py @@ -0,0 +1,54 @@ +"""Tests for the model registry helper.""" + +from pathlib import Path + +import pytest + +from energy_fault_detector.api.model_registry import ModelNotFoundError, ModelRegistry + + +def _create_model_dir(base: Path, model_name: str, version: str) -> Path: + path = base / model_name / version + (path / "config.yaml").parent.mkdir(parents=True, exist_ok=True) + (path / "config.yaml").write_text("model: test", encoding="utf-8") + return path + + +def test_resolve_latest_version(tmp_path): + registry = ModelRegistry(root_directory=tmp_path) + _create_model_dir(tmp_path, "turbine", "20230101") + latest = _create_model_dir(tmp_path, "turbine", "20240202") + + resolved_path, version = registry.resolve("turbine") + + assert resolved_path == latest + assert version == "20240202" + + +def test_resolve_specific_version(tmp_path): + registry = ModelRegistry(root_directory=tmp_path) + expected = _create_model_dir(tmp_path, "asset", "v1") + + resolved_path, version = registry.resolve("asset", "v1") + + assert resolved_path == expected + assert version == "v1" + + +def test_resolve_versionless_model(tmp_path): + registry = ModelRegistry(root_directory=tmp_path) + path = tmp_path / "single" + (path / "config.yaml").parent.mkdir(parents=True, exist_ok=True) + (path / "config.yaml").write_text("model: test", encoding="utf-8") + + resolved_path, version = registry.resolve("single") + + assert resolved_path == path + assert version == "single" + + +def test_resolve_missing_model(tmp_path): + registry = ModelRegistry(root_directory=tmp_path) + + with pytest.raises(ModelNotFoundError): + registry.resolve("unknown") diff --git a/tests/test_quick_fault_detector_rca.py b/tests/test_quick_fault_detector_rca.py new file mode 100644 index 0000000..3cd206a --- /dev/null +++ b/tests/test_quick_fault_detector_rca.py @@ -0,0 +1,87 @@ +"""Ensure that additional RCA ignore patterns are respected during prediction.""" + +from __future__ import annotations + +from typing import List + +import pytest + +pd = pytest.importorskip("pandas") + +from energy_fault_detector.quick_fault_detection.quick_fault_detector import quick_fault_detector + + +class DummyResult: + def __init__(self) -> None: + self.predicted_anomalies = pd.DataFrame({"anomaly": [False], "critical_event": [False]}) + + def save(self, *_args, **_kwargs) -> None: + return None + + +class DummyFaultDetector: + instances: List["DummyFaultDetector"] = [] + + def __init__(self, config, **_kwargs) -> None: # noqa: D401 - matches interface + self.config = config + self.instances.append(self) + + def load_models(self, *_args, **_kwargs) -> None: + return None + + def predict(self, *_args, **_kwargs): + return DummyResult() + + +def test_quick_fault_detector_extends_ignore_features(monkeypatch, tmp_path): + DummyFaultDetector.instances = [] + + def fake_load_train_test_data(**_kwargs): + data = pd.DataFrame({"a": [1.0, 2.0]}) + return data, pd.Series([True, False]), data + + def fake_create_events(**_kwargs): + return pd.DataFrame(), [] + + def fake_analyze_event(**_kwargs): + return pd.Series(dtype=float), pd.DataFrame() + + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector.load_train_test_data", + fake_load_train_test_data, + ) + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector.create_events", + fake_create_events, + ) + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector.analyze_event", + fake_analyze_event, + ) + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector.generate_output_plots", + lambda **_kwargs: None, + ) + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector._save_prediction_results", + lambda **_kwargs: None, + ) + monkeypatch.setattr( + "energy_fault_detector.quick_fault_detection.quick_fault_detector.FaultDetector", + DummyFaultDetector, + ) + + csv_path = tmp_path / "data.csv" + csv_path.write_text("a\n1\n2\n", encoding="utf-8") + + quick_fault_detector( + csv_data_path=None, + csv_test_data_path=str(csv_path), + mode="predict", + model_path=str(tmp_path), + rca_ignore_features=["feature_a"], + ) + + assert DummyFaultDetector.instances, "FaultDetector was not instantiated" + ignore_features = DummyFaultDetector.instances[0].config.config_dict["root_cause_analysis"]["ignore_features"] + assert "feature_a" in ignore_features diff --git a/tests/test_root_cause_ignore_features.py b/tests/test_root_cause_ignore_features.py new file mode 100644 index 0000000..3f3fea8 --- /dev/null +++ b/tests/test_root_cause_ignore_features.py @@ -0,0 +1,121 @@ +import pytest + +pd = pytest.importorskip('pandas') + +from energy_fault_detector.config import Config +from energy_fault_detector.fault_detector import FaultDetector + + +class _IdentityPreprocessor: + def transform(self, data): + return data + + def inverse_transform(self, data): + return data + + +class _IdentityAutoencoder: + conditional_features = () + + def predict(self, x, return_conditions=False, verbose=None): + return x + + def get_reconstruction_error(self, x): + return pd.DataFrame(0.0, index=x.index, columns=x.columns) + + +class _ZeroAnomalyScore: + def transform(self, recon_error): + return pd.Series(0.0, index=recon_error.index) + + +class _ZeroThresholdSelector: + threshold = 0.0 + + def predict(self, scores): + return pd.Series(False, index=scores.index) + + +def _build_config(ignore_features=None): + config_dict = { + 'train': { + 'anomaly_score': {'name': 'rmse', 'params': {}}, + 'autoencoder': {'name': 'default', 'params': {'layers': [1]}}, + 'data_preprocessor': {'params': {}}, + 'threshold_selector': {'name': 'quantile', 'params': {}}, + }, + 'root_cause_analysis': {}, + } + if ignore_features is not None: + config_dict['root_cause_analysis']['ignore_features'] = ignore_features + return Config(config_dict=config_dict) + + +def _build_fault_detector(config): + detector = FaultDetector.__new__(FaultDetector) + detector.config = config + detector.data_preprocessor = _IdentityPreprocessor() + detector.autoencoder = _IdentityAutoencoder() + detector.anomaly_score = _ZeroAnomalyScore() + detector.threshold_selector = _ZeroThresholdSelector() + return detector + + +def test_run_root_cause_analysis_uses_configured_ignore_features(monkeypatch): + config = _build_config(ignore_features=['temp_*']) + detector = _build_fault_detector(config) + + captured_kwargs = {} + + class DummyArcana: + def __init__(self, model, **kwargs): + captured_kwargs.update(kwargs) + + def find_arcana_bias(self, x, track_losses, track_bias): + return x.copy(), pd.DataFrame(), [] + + monkeypatch.setattr('energy_fault_detector.fault_detector.Arcana', DummyArcana) + + sensor_data = pd.DataFrame({'temp_a': [1.0], 'temp_b': [2.0]}) + detector.run_root_cause_analysis(sensor_data=sensor_data) + + assert captured_kwargs.get('ignore_features') == ['temp_*'] + + +def test_run_root_cause_analysis_allows_ignore_feature_override(monkeypatch): + config = _build_config(ignore_features=['temp_*']) + detector = _build_fault_detector(config) + + captured_kwargs = {} + + class DummyArcana: + def __init__(self, model, **kwargs): + captured_kwargs.update(kwargs) + + def find_arcana_bias(self, x, track_losses, track_bias): + return x.copy(), pd.DataFrame(), [] + + monkeypatch.setattr('energy_fault_detector.fault_detector.Arcana', DummyArcana) + + sensor_data = pd.DataFrame({'temp_a': [1.0], 'temp_b': [2.0]}) + detector.run_root_cause_analysis(sensor_data=sensor_data, ignore_features=['pressure_*']) + + assert captured_kwargs.get('ignore_features') == ['pressure_*'] + + +def test_predict_passes_configured_ignore_features(monkeypatch): + config = _build_config(ignore_features=['temp_*']) + detector = _build_fault_detector(config) + + captured_kwargs = {} + + def _run_root_cause_analysis(self, sensor_data, track_losses, track_bias, ignore_features=None): + captured_kwargs['ignore_features'] = ignore_features + return pd.DataFrame(), pd.DataFrame(), [] + + monkeypatch.setattr(FaultDetector, 'run_root_cause_analysis', _run_root_cause_analysis) + + sensor_data = pd.DataFrame({'temp_a': [1.0, 2.0], 'temp_b': [3.0, 4.0]}) + detector.predict(sensor_data=sensor_data, root_cause_analysis=True) + + assert list(captured_kwargs.get('ignore_features')) == ['temp_*'] diff --git a/tests/threshold_selectors/test_quantile_threshold.py b/tests/threshold_selectors/test_quantile_threshold.py index 740708a..b77bf0e 100644 --- a/tests/threshold_selectors/test_quantile_threshold.py +++ b/tests/threshold_selectors/test_quantile_threshold.py @@ -2,6 +2,7 @@ from unittest import TestCase import numpy as np +import pandas as pd from numpy.testing import assert_array_equal from energy_fault_detector.anomaly_scores.rmse_score import RMSEScore @@ -38,6 +39,22 @@ def test_fit_without_label(self) -> None: assert_array_equal(exp_threshold_with_anomaly, self.threshold_selector.threshold) + def test_fit_with_dataframe_normal_index(self) -> None: + scores = pd.Series(self.rmse.transform(self.train_data)) + normal_index = pd.DataFrame({'normal': self.normal_index}, index=scores.index) + + self.threshold_selector.fit(scores, normal_index) + + assert_array_equal(1.2392478743647883, self.threshold_selector.threshold) + + def test_fit_with_all_false_labels_uses_all_scores(self) -> None: + scores = pd.Series(self.rmse.transform(self.train_data)) + normal_index = pd.Series([False] * len(scores), index=scores.index) + + self.threshold_selector.fit(scores, normal_index) + + assert_array_equal(1.617323497052351, self.threshold_selector.threshold) + def test_predict(self) -> None: # expected output exp_anomalies = [True]*5 + [False]*23 + [True]*5 diff --git a/tests/utils/test_asset_dataset_splitter.py b/tests/utils/test_asset_dataset_splitter.py new file mode 100644 index 0000000..9398e54 --- /dev/null +++ b/tests/utils/test_asset_dataset_splitter.py @@ -0,0 +1,48 @@ +from pathlib import Path + +import pytest + +pd = pytest.importorskip("pandas") + +from energy_fault_detector.utils.asset_dataset_splitter import split_asset_datasets + + +def _write_csv(path: Path, data: pd.DataFrame) -> None: + path.write_text(data.to_csv(sep=";", index=False)) + + +def test_split_asset_datasets(tmp_path): + input_dir = tmp_path / "input" + input_dir.mkdir() + + data = pd.DataFrame( + { + "asset_id": ["asset_a", "asset_a", "asset_a", "asset_b"], + "train_test": ["train", "prediction", "train", "prediction"], + "train_test_bool": [True, False, True, False], + "status_type_id": [0, 1, 3, 2], + "status_type_bool": [False, True, True, False], + "temperature_avg": [10, 11, 12, 13], + "temperature_max": [20, 21, 22, 23], + "sensor_value": [100, 110, 120, 130], + "time_stamp": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"], + } + ) + + _write_csv(input_dir / "dataset.csv", data) + + split_asset_datasets(input_dir) + + train_a = pd.read_csv(input_dir / "train_asset_a.csv", sep=";") + predict_a = pd.read_csv(input_dir / "predict_asset_a.csv", sep=";") + predict_b = pd.read_csv(input_dir / "predict_asset_b.csv", sep=";") + + assert set(train_a.columns) == {"temperature_avg", "sensor_value", "time_stamp"} + assert set(predict_a.columns) == {"temperature_avg", "sensor_value", "time_stamp"} + + assert len(train_a) == 3 # includes both train rows and normal status rows + assert len(predict_a) == 2 # prediction row + anomaly status row + + # Asset B only has a prediction row with normal status. It should appear in prediction output + assert set(predict_b.columns) == {"temperature_avg", "sensor_value", "time_stamp"} + assert len(predict_b) == 1 diff --git a/tests/utils/test_feature_filters.py b/tests/utils/test_feature_filters.py new file mode 100644 index 0000000..06df679 --- /dev/null +++ b/tests/utils/test_feature_filters.py @@ -0,0 +1,47 @@ +import pytest + +pd = pytest.importorskip('pandas') + +import pandas as pd + +from energy_fault_detector.utils.feature_filters import ( + mask_ignored_features, + resolve_ignored_columns, +) + + +def test_resolve_ignored_columns_matches_and_reports_unmatched(): + columns = ['windspeed_avg', 'output_power', 'temperature'] + patterns = ['windspeed*', 'non_existent'] + + ignored, unmatched = resolve_ignored_columns(columns, patterns) + + assert ignored == {'windspeed_avg'} + assert unmatched == {'non_existent'} + + +def test_resolve_ignored_columns_matches_case_insensitively(): + columns = ['Power_58_Avg', 'Other'] + patterns = ['power_58_avg'] + + ignored, unmatched = resolve_ignored_columns(columns, patterns) + + assert ignored == {'Power_58_Avg'} + assert unmatched == set() + + +def test_mask_ignored_features_sets_columns_to_zero(): + df = pd.DataFrame({ + 'windspeed_avg': [1.0, -2.0], + 'temperature': [3.0, 4.0], + }) + + masked, ignored, unmatched = mask_ignored_features(df, ['windspeed*']) + + assert ignored == {'windspeed_avg'} + assert unmatched == set() + assert masked['windspeed_avg'].tolist() == [0.0, 0.0] + assert masked['temperature'].tolist() == [3.0, 4.0] + # original DataFrame is not modified + assert df['windspeed_avg'].tolist() == [1.0, -2.0] + diff --git a/tests/utils/test_unsupervised_behavior_model.py b/tests/utils/test_unsupervised_behavior_model.py new file mode 100644 index 0000000..62b3fc7 --- /dev/null +++ b/tests/utils/test_unsupervised_behavior_model.py @@ -0,0 +1,67 @@ +"""Tests for the unsupervised behaviour model utility.""" + +from __future__ import annotations + +import pytest + +np = pytest.importorskip("numpy") +pd = pytest.importorskip("pandas") + +from energy_fault_detector.utils.unsupervised_behavior_model import ( + SensorRange, + UnsupervisedBehaviorModel, +) + + +def _create_synthetic_data(seed: int = 7) -> pd.DataFrame: + rng = np.random.default_rng(seed) + base = pd.DataFrame( + { + "power": rng.normal(loc=1.0, scale=0.05, size=512), + "wind_speed": rng.normal(loc=12.0, scale=0.5, size=512), + } + ) + return base + + +def test_sensor_range_contains_works_for_bounds() -> None: + sensor_range = SensorRange(lower=0.0, upper=10.0) + series = pd.Series([-1.0, 0.0, 5.0, 10.0, 11.0]) + + result = sensor_range.contains(series) + + np.testing.assert_array_equal(result.values, np.array([False, True, True, True, False])) + + +def test_fit_computes_normal_ranges() -> None: + data = _create_synthetic_data() + + model = UnsupervisedBehaviorModel(contamination=0.05, quantile_bounds=(0.05, 0.95), random_state=1) + model.fit(data) + normal_ranges = model.get_normal_ranges() + + assert list(normal_ranges.index) == ["power", "wind_speed"] + # Verify that medians are close to the expected centre point. + assert abs(normal_ranges.loc["power", "median"] - 1.0) < 0.01 + assert abs(normal_ranges.loc["wind_speed", "median"] - 12.0) < 0.1 + + +def test_predict_flags_outliers() -> None: + data = _create_synthetic_data() + model = UnsupervisedBehaviorModel(contamination=0.05, quantile_bounds=(0.05, 0.95), random_state=1) + model.fit(data) + + evaluation = pd.DataFrame( + { + "power": [1.02, 3.0], + "wind_speed": [12.1, 5.0], + } + ) + + predictions = model.predict(evaluation) + + assert predictions.loc[0, "is_anomaly"] is False + assert predictions.loc[1, "is_anomaly"] is True + assert predictions.loc[1, "range_violation_count"] >= 1 + assert "wind_speed" in predictions.loc[1, "violated_sensors"] +