Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Run Model Interface Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r anomaly-detection-container/requirements.txt

- name: Run centralized model tests
run: |
python anomaly-detection-container/tests/all_tests.py
10 changes: 8 additions & 2 deletions anomaly-detection-container/anomaly_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ def process_data(self, data):
# Process constellation-level anomalies
for model in self.constellation_models:
try:
is_anomaly, details = model.detect(data)
timestep = data.get('time', 0)
this_satellite_id = data.get('satellite_id', 0)
channel_data = data.get('data', {})
is_anomaly, details = model.detect(timestep, this_satellite_id, channel_data)
if is_anomaly:
# details could be a list of anomalies
if isinstance(details.get('anomalies'), list):
Expand All @@ -161,7 +164,10 @@ def process_data(self, data):
try:
logging.info(f"Processing satellite-specific anomalies for satellite {satellite_id} with model {model_name}")
logging.info(f"Data: {data}")
is_anomaly, details = model.detect(data)
timestep = data.get('time', 0)
channel_data = data.get('data', {})
satellite_id = data.get('satellite_id', 0)
is_anomaly, details = model.detect(timestep, satellite_id, channel_data)
if is_anomaly:
self.record_anomaly(details, True)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import kagglehub
import os

current_path = os.path.dirname(os.path.abspath(__file__))

# check if the nasa-smap-msl directory exists
if not os.path.exists(os.path.join(current_path, "nasa-smap-msl")):
os.makedirs(os.path.join(current_path, "nasa-smap-msl"))

# get absolute path to the nasa-smap-msl directory
nasa_smap_msl_path = os.path.join(current_path, "nasa-smap-msl")

path = kagglehub.dataset_download("patrickfleith/nasa-anomaly-detection-dataset-smap-msl", path=nasa_smap_msl_path)

print("Path to NASA SMAP/MSL dataset files:", path)
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@ class ConstellationAnomalyDetectionModel(ABC):
"""

@abstractmethod
def detect(self, data) -> tuple[bool, AnomalyDetails]:
def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails] | None]:
"""
Process incoming data and detect anomalies at the constellation level.

Parameters:
data (dict): A dictionary containing telemetry data. Data format:
{
'time': number,
'satellite_id': number,
'data': dict
}
Since data comes in one satellite at a time, this class will need to track the satellites together.
time (int): The time of the data point.
satellite_id (int): The ID of the satellite.
data (dict): A dictionary containing the telemetry data from any channels at this timestep.

Returns:
bool: True if anomaly is detected, False otherwise.
dict: Details of the anomaly if detected.
list[AnomalyDetails]: Details of the anomaly (or multiple anomalies) if detected. Otherwise, return None.
"""
pass

Expand All @@ -44,3 +40,8 @@ def load_model(self):
Load or initialize the model. This method can be used to load pre-trained models.
"""
pass

@abstractmethod
def save_model(self, model_path: str):
"""Persist model weights or parameters."""
pass
Original file line number Diff line number Diff line change
@@ -1,61 +1,98 @@
from models.constellation.constellation_anomaly_detection_model import ConstellationAnomalyDetectionModel
from models.constellation.constellation_anomaly_detection_model import ConstellationAnomalyDetectionModel, AnomalyDetails
import numpy as np

class ConstellationOutlierDetection(ConstellationAnomalyDetectionModel):
"""
Example constellation-level anomaly detection model using statistical outliers.
Example constellation-level anomaly detection model using statistical outliers over a sliding time window.
"""

def __init__(self, std_threshold=2):
def __init__(self, std_threshold=2, window_seconds=30):
"""
Parameters:
std_threshold (float): The threshold for detecting outliers based on standard deviation.
std_threshold (float): Threshold (in multiples of standard deviation) for detecting outliers.
window_seconds (int): Duration (in seconds) of the sliding window over which to calculate statistics.
"""
self.std_threshold = std_threshold
self.window = window_seconds

# A dictionary to store history of readings for each channel.
# Keys are channel names; values are lists of tuples (time, value).
self.channel_history = {}
self.load_model()

def load_model(self):
"""
Initialize any required parameters or load pre-trained models.
For a simple statistical model, no initialization is needed.
"""
# For a simple statistical model, no initialization is needed
pass

def detect(self, data):
def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]:
"""
Detect anomalies based on the average velocity of the constellation.

data (dict): A dictionary containing telemetry data for all satellites. Data format:
{
'time': number,
'satellite_id': number,
'data': dict
}
Since data comes in one satellite at a time, this will need to track the satellites together
Detect anomalies on a per-satellite basis by comparing each channel's current reading against
the running mean and standard deviation computed over a sliding window across all satellites.

Parameters:
time (int): The time of the data point.
satellite_id (int): The ID of the satellite.
data (dict): A dictionary containing telemetry data channels and their values.

Returns:
tuple[bool, list[AnomalyDetails]]:
- bool: True if at least one anomaly is detected; False otherwise.
- list[AnomalyDetails]: List of anomaly details instances.
"""
anomalies = []

# if not a list, make it one
if not isinstance(data, list):
data = [data]
# Update internal history for each channel in the current data
for channel, value in data.items():
# Ensure the value is numeric before updating history
if not isinstance(value, (int, float)):
continue
if channel not in self.channel_history:
self.channel_history[channel] = []
self.channel_history[channel].append((time, value))

velocities = [sat['data']['velocity'] for sat in data if 'velocity' in sat['data']]
if not velocities:
return False, {}
# Prune old entries: keep only records within the sliding window
for channel in self.channel_history:
self.channel_history[channel] = [(t, v) for t, v in self.channel_history[channel] if t >= time - self.window]

mean_velocity = np.mean(velocities)
std_velocity = np.std(velocities)
# Check for anomalies in each channel in the current satellite data
for channel, current_value in data.items():
if not isinstance(current_value, (int, float)):
continue

anomalies = []
for sat in data:
velocity = sat['data'].get('velocity')
if velocity is not None and abs(velocity - mean_velocity) > self.std_threshold * std_velocity:
anomalies.append({
'satellite_id': sat['satellite_id'],
'metric': 'velocity',
'value': velocity,
'message': 'Velocity deviates significantly from constellation average.'
})
history = self.channel_history.get(channel, [])
# Only consider channels with at least two data points for statistics calculation
if len(history) < 2:
continue

values = [v for t, v in history]
mean_val = np.mean(values)
std_val = np.std(values)

# Avoid division by zero and ignore cases with no variation
if std_val == 0:
continue

# If the current reading is outside of the confidence bounds, flag an anomaly
if abs(current_value - mean_val) > self.std_threshold * std_val:
anomaly = AnomalyDetails(
satellite_id=satellite_id,
anomaly_model=self.__class__.__name__,
time=time,
time_end=time, # In this simple case, time_end is set equal to time.
metric=channel,
value=current_value,
message=f"{channel} value {current_value} deviates from mean {mean_val:.2f} by more than "
f"{self.std_threshold} standard deviations ({std_val:.2f})."
)
anomalies.append(anomaly)

if anomalies:
return True, {'anomalies': anomalies}
return False, {}
return True, anomalies
return False, []


def save_model(self, model_path: str):
pass
85 changes: 54 additions & 31 deletions anomaly-detection-container/models/satellite/outlier_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,96 +28,119 @@ def load_model(self):
"""
pass

def detect(self, data):
def detect(self, time, satellite_id, data):
"""
Detect outliers in the incoming data.

Parameters:
data (dict): A dictionary containing telemetry data. Data format:
{
'time': number,
'satellite_id': number,
'data': dict
}
time (int): The timestamp of the data point.
satellite_id (int): The ID of the satellite.
data (dict): A dictionary containing telemetry data fields and their values.

Returns:
bool: True if an anomaly is detected, False otherwise.
AnomalyDetails: Details of the anomaly if detected. Otherwise, return None.
list[AnomalyDetails]: Details of the anomaly (or multiple) if detected. Otherwise, return None.
"""
logging.info(f'Outlier detection model reports: {self.metric} - {self.threshold}')
self.satellite_id = satellite_id # Store satellite_id for use in detect_field

if self.metric is not None and not isinstance(self.metric, list):
data = self.detect_field(data['time'], self.metric, data['data'])
if data is not None:
return True, data
anomaly_data = self.detect_field(time, self.metric, data)
if anomaly_data is not None:
return True, anomaly_data
else:
return False, None

elif isinstance(self.metric, list):
for field in self.metric:
logging.info(f"--------------------------------- {field}")
anomaly_data = self.detect_field(data['time'], field, data['data'])
anomaly_data = self.detect_field(time, field, data)
if anomaly_data is not None:
return True, anomaly_data
else:
anomalies = []
for field in data:
anomaly_data = self.detect_field(data['time'], field, data['data'])
anomaly_data = self.detect_field(time, field, data)
if anomaly_data is not None:
return True, anomaly_data
anomalies.append(anomaly_data)

if anomalies:
return True, anomalies

return False, None

def detect_field(self, time, field, data):
"""
Detect outliers in a specific field of the incoming data.

Parameters:
time (int): The timestamp of the data point.
field (str): The field to check for outliers.
data (dict): A dictionary containing the payload. This is the 'data' field in the whole telemetry packet.
data (dict): A dictionary containing the payload fields and their values.

Returns:
AnomalyDetails: Details of the anomaly if detected. Otherwise, return None
"""
field_name = field
field = data.get(field, None)
logging.info(f'YEEEEEEEEEEEEEEE: {field} - {self.means} - {self.variances} - {self.counts}')
if field is None:
field_value = data.get(field, None)
logging.info(f'Analyzing field: {field_name} - Value: {field_value}')

if field_value is None:
return None

# Skip non-numeric values
if not isinstance(field_value, (int, float)):
return None

# keep running mean and std for the field
# Initialize tracking for this field if it's the first time we see it
if field_name not in self.means:
self.means[field_name] = 0
self.variances[field_name] = 0
self.counts[field_name] = 0
self.anomaly_start[field_name] = None

self.means[field_name] = (self.means[field_name] * self.counts[field_name] + field) / (self.counts[field_name] + 1)
# Welford's online algorithm for variance
self.variances[field_name] = (self.variances[field_name] * (self.counts[field_name]) + (field - self.means[field_name]) ** 2) / (self.counts[field_name] + 1)
# Update running statistics using Welford's online algorithm
self.means[field_name] = (self.means[field_name] * self.counts[field_name] + field_value) / (self.counts[field_name] + 1)
self.variances[field_name] = (self.variances[field_name] * (self.counts[field_name]) +
(field_value - self.means[field_name]) ** 2) / (self.counts[field_name] + 1)
self.counts[field_name] += 1

mean = self.means[field_name]
std = self.variances[field_name] ** 0.5

# Avoid division by zero
if std == 0:
return None

# Simple z-score based outlier detection
z_score = (field - mean) / std
# Calculate z-score
z_score = (field_value - mean) / std

# if it is an anomaly
if abs(z_score) > self.threshold:

# check if anomaly already started
if self.anomaly_start[field_name] is None:
# Check if anomaly already started
if field_name not in self.anomaly_start or self.anomaly_start[field_name] is None:
self.anomaly_start[field_name] = time
ending_time = time
else:
ending_time = time
time = self.anomaly_start[field_name]


anomaly_details = AnomalyDetails(
satellite_id=self.satellite_id, anomaly_model=self.__class__.__name__, time=time,
metric=field_name, value=field, time_end=ending_time,
satellite_id=self.satellite_id,
anomaly_model=self.__class__.__name__,
time=time,
metric=field_name,
value=field_value,
time_end=ending_time,
message=f'{field_name} outlier detected with z-score {z_score:.2f}'
)

return anomaly_details
else:
self.anomaly_start[field_name] = None # reset the anomaly start time
self.anomaly_start[field_name] = None # Reset the anomaly start time

return None


def save_model(self, model_path: str):
pass
Loading