Skip to content

VenusDataAI/pipeline-anomaly-detector

Repository files navigation

Pipeline Anomaly Detector

Build Coverage PyPI Python License: MIT

Pipeline Anomaly Detector is a production-ready ML toolkit for detecting anomalies in data pipeline runs. It collects run metadata from dbt, Airflow, or any custom source, extracts 11 statistical features per run, and scores each run with a choice of Z-Score, Isolation Forest, or Ensemble detectors, sending alerts to Slack when something looks wrong.


Quickstart

# 1. Install
pip install pipeline-anomaly-detector

# 2. Collect runs from dbt
pad collect --source dbt --dbt-dir "./target/run_results.json" --since 2024-01-01 --output runs.jsonl

# 3. Train an ensemble detector
pad train --input runs.jsonl --detector ensemble --output ./models

# 4. Score new runs
pad score-batch --input new_runs.jsonl --model ./models/global_ensemble_*.joblib --db scores.db

# 5. Explain an anomaly
pad explain --run-id anomaly_duration_000 --model ./models/global_ensemble_*.joblib --db scores.db

Architecture

graph TD
    A[Pipeline Sources] --> B[Collectors]
    B --> |PipelineRun objects| C[FeatureExtractor]
    C --> |11 features| D[Detectors]

    subgraph Collectors
        B1[DbtCollector]
        B2[AirflowCollector]
        B3[GenericCollector]
    end

    subgraph Detectors
        D1[ZScoreDetector]
        D2[IsolationForestDetector]
        D3[EnsembleDetector]
    end

    D --> E[Scorer]
    E --> F[(SQLite DB)]
    E --> G[AlertRouter]
    G --> H[Slack]
    G --> I[Log]

    J[ModelStore] --> |joblib| D
    D --> |fitted model| J
Loading

Detector Comparison

Detector Algorithm Interpretability Multi-variate When to use
zscore Rolling z-score High (per-feature z) No Simple baselines, limited data
isolation_forest sklearn IsolationForest Medium (permutation importance) Yes Complex multi-feature anomalies
ensemble Weighted average Medium (union of features) Yes Production default

See docs/detector_comparison.md for full details.


Features

11 features are extracted per pipeline run:

Feature Description
duration_seconds Raw wall-clock duration
duration_z Z-score vs rolling 30-run window
rows_processed_log1p log1p of row count
row_count_delta_pct % change vs previous run
null_rate_max Max null rate across columns
null_rate_delta Change in null rate
hour_of_day UTC hour (0–23)
day_of_week 0=Monday … 6=Sunday
is_weekend Binary weekend flag
status_is_success Binary success flag
rejection_rate rows_rejected / rows_processed

See docs/feature_reference.md for the full reference.


Documentation


Development

# Clone and install in editable mode with dev extras
git clone https://github.com/your-org/pipeline-anomaly-detector.git
cd pipeline-anomaly-detector
pip install -e ".[dev]"

# Run tests
pytest tests/ -v --cov=pipeline_anomaly_detector

# Open the exploration notebook
jupyter notebook notebooks/exploration.ipynb

License

MIT License; see LICENSE.

About

ML toolkit that detects anomalies in dbt/Airflow pipeline runs — Z-Score, Isolation Forest & Ensemble detectors, 11 statistical features per run, Slack alerts, installable as a CLI (pad)

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors