From 4616e63c2b1d8c61cd913a1e96e7060c5f2846bd Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Thu, 27 Mar 2025 16:19:26 -0400 Subject: [PATCH 1/9] NASA SMAP/MSL dataset download --- .../data/data_install_script.py | 15 +++++++++++++++ anomaly-detection-container/requirements.txt | 3 ++- 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 anomaly-detection-container/data/data_install_script.py diff --git a/anomaly-detection-container/data/data_install_script.py b/anomaly-detection-container/data/data_install_script.py new file mode 100644 index 0000000..9de4ad8 --- /dev/null +++ b/anomaly-detection-container/data/data_install_script.py @@ -0,0 +1,15 @@ +import kagglehub +import os + +current_path = os.path.dirname(os.path.abspath(__file__)) + +# check if the nasa-smap-msl directory exists +if not os.path.exists(os.path.join(current_path, "nasa-smap-msl")): + os.makedirs(os.path.join(current_path, "nasa-smap-msl")) + +# get absolute path to the nasa-smap-msl directory +nasa_smap_msl_path = os.path.join(current_path, "nasa-smap-msl") + +path = kagglehub.dataset_download("patrickfleith/nasa-anomaly-detection-dataset-smap-msl", path=nasa_smap_msl_path) + +print("Path to NASA SMAP/MSL dataset files:", path) \ No newline at end of file diff --git a/anomaly-detection-container/requirements.txt b/anomaly-detection-container/requirements.txt index 8b503c7..8a00cf2 100644 --- a/anomaly-detection-container/requirements.txt +++ b/anomaly-detection-container/requirements.txt @@ -1,4 +1,5 @@ kafka-python psycopg2-binary influxdb -numpy \ No newline at end of file +numpy +kagglehub \ No newline at end of file From 86cf43f119982d74e290ddf8afde6a18cb83fa99 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Sat, 5 Apr 2025 19:56:43 -0400 Subject: [PATCH 2/9] Revised constellation outlier detection --- .../anomaly_manager.py | 5 +- .../data_install_script.py | 0 .../constellation_anomaly_detection_model.py | 19 ++-- .../constellation_outlier_detection.py | 103 ++++++++++++------ .../satellite_anomaly_detection_model.py | 5 + 5 files changed, 87 insertions(+), 45 deletions(-) rename anomaly-detection-container/data/{ => nasa-smap-msl}/data_install_script.py (100%) diff --git a/anomaly-detection-container/anomaly_manager.py b/anomaly-detection-container/anomaly_manager.py index 03cba0d..1b32f2d 100644 --- a/anomaly-detection-container/anomaly_manager.py +++ b/anomaly-detection-container/anomaly_manager.py @@ -144,7 +144,10 @@ def process_data(self, data): # Process constellation-level anomalies for model in self.constellation_models: try: - is_anomaly, details = model.detect(data) + timestep = data.get('time', 0) + this_satellite_id = data.get('satellite_id', 0) + channel_data = data.get('data', {}) + is_anomaly, details = model.detect(timestep, this_satellite_id, channel_data) if is_anomaly: # details could be a list of anomalies if isinstance(details.get('anomalies'), list): diff --git a/anomaly-detection-container/data/data_install_script.py b/anomaly-detection-container/data/nasa-smap-msl/data_install_script.py similarity index 100% rename from anomaly-detection-container/data/data_install_script.py rename to anomaly-detection-container/data/nasa-smap-msl/data_install_script.py diff --git a/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py b/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py index 145e215..fb3a060 100644 --- a/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py +++ b/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py @@ -19,22 +19,18 @@ class ConstellationAnomalyDetectionModel(ABC): """ @abstractmethod - def detect(self, data) -> tuple[bool, AnomalyDetails]: + def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: """ Process incoming data and detect anomalies at the constellation level. Parameters: - data (dict): A dictionary containing telemetry data. Data format: - { - 'time': number, - 'satellite_id': number, - 'data': dict - } - Since data comes in one satellite at a time, this class will need to track the satellites together. + time (int): The time of the data point. + satellite_id (int): The ID of the satellite. + data (dict): A dictionary containing the telemetry data from any channels at this timestep. Returns: bool: True if anomaly is detected, False otherwise. - dict: Details of the anomaly if detected. + list[AnomalyDetails]: Details of the anomaly (or multiple anomalies) if detected. Otherwise, return None. """ pass @@ -44,3 +40,8 @@ def load_model(self): Load or initialize the model. This method can be used to load pre-trained models. """ pass + + @abstractmethod + def save_model(self, model_path: str): + """Persist model weights or parameters.""" + pass diff --git a/anomaly-detection-container/models/constellation/constellation_outlier_detection.py b/anomaly-detection-container/models/constellation/constellation_outlier_detection.py index bc3d7e5..440f6fd 100644 --- a/anomaly-detection-container/models/constellation/constellation_outlier_detection.py +++ b/anomaly-detection-container/models/constellation/constellation_outlier_detection.py @@ -1,61 +1,94 @@ -from models.constellation.constellation_anomaly_detection_model import ConstellationAnomalyDetectionModel +from models.constellation.constellation_anomaly_detection_model import ConstellationAnomalyDetectionModel, AnomalyDetails import numpy as np class ConstellationOutlierDetection(ConstellationAnomalyDetectionModel): """ - Example constellation-level anomaly detection model using statistical outliers. + Example constellation-level anomaly detection model using statistical outliers over a sliding time window. """ - def __init__(self, std_threshold=2): + def __init__(self, std_threshold=2, window_seconds=30): """ Parameters: - std_threshold (float): The threshold for detecting outliers based on standard deviation. + std_threshold (float): Threshold (in multiples of standard deviation) for detecting outliers. + window_seconds (int): Duration (in seconds) of the sliding window over which to calculate statistics. """ self.std_threshold = std_threshold + self.window = window_seconds + + # A dictionary to store history of readings for each channel. + # Keys are channel names; values are lists of tuples (time, value). + self.channel_history = {} self.load_model() def load_model(self): """ Initialize any required parameters or load pre-trained models. + For a simple statistical model, no initialization is needed. """ - # For a simple statistical model, no initialization is needed pass - def detect(self, data): + def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: """ - Detect anomalies based on the average velocity of the constellation. - - data (dict): A dictionary containing telemetry data for all satellites. Data format: - { - 'time': number, - 'satellite_id': number, - 'data': dict - } - Since data comes in one satellite at a time, this will need to track the satellites together + Detect anomalies on a per-satellite basis by comparing each channel's current reading against + the running mean and standard deviation computed over a sliding window across all satellites. + + Parameters: + time (int): The time of the data point. + satellite_id (int): The ID of the satellite. + data (dict): A dictionary containing telemetry data channels and their values. + + Returns: + tuple[bool, list[AnomalyDetails]]: + - bool: True if at least one anomaly is detected; False otherwise. + - list[AnomalyDetails]: List of anomaly details instances. """ + anomalies = [] - # if not a list, make it one - if not isinstance(data, list): - data = [data] + # Update internal history for each channel in the current data + for channel, value in data.items(): + # Ensure the value is numeric before updating history + if not isinstance(value, (int, float)): + continue + if channel not in self.channel_history: + self.channel_history[channel] = [] + self.channel_history[channel].append((time, value)) - velocities = [sat['data']['velocity'] for sat in data if 'velocity' in sat['data']] - if not velocities: - return False, {} + # Prune old entries: keep only records within the sliding window + for channel in self.channel_history: + self.channel_history[channel] = [(t, v) for t, v in self.channel_history[channel] if t >= time - self.window] - mean_velocity = np.mean(velocities) - std_velocity = np.std(velocities) + # Check for anomalies in each channel in the current satellite data + for channel, current_value in data.items(): + if not isinstance(current_value, (int, float)): + continue - anomalies = [] - for sat in data: - velocity = sat['data'].get('velocity') - if velocity is not None and abs(velocity - mean_velocity) > self.std_threshold * std_velocity: - anomalies.append({ - 'satellite_id': sat['satellite_id'], - 'metric': 'velocity', - 'value': velocity, - 'message': 'Velocity deviates significantly from constellation average.' - }) + history = self.channel_history.get(channel, []) + # Only consider channels with at least two data points for statistics calculation + if len(history) < 2: + continue + + values = [v for t, v in history] + mean_val = np.mean(values) + std_val = np.std(values) + + # Avoid division by zero and ignore cases with no variation + if std_val == 0: + continue + + # If the current reading is outside of the confidence bounds, flag an anomaly + if abs(current_value - mean_val) > self.std_threshold * std_val: + anomaly = AnomalyDetails( + satellite_id=satellite_id, + anomaly_model=self.__class__.__name__, + time=time, + time_end=time, # In this simple case, time_end is set equal to time. + metric=channel, + value=current_value, + message=f"{channel} value {current_value} deviates from mean {mean_val:.2f} by more than " + f"{self.std_threshold} standard deviations ({std_val:.2f})." + ) + anomalies.append(anomaly) if anomalies: - return True, {'anomalies': anomalies} - return False, {} + return True, anomalies + return False, [] diff --git a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py index 381875f..29101d8 100644 --- a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py +++ b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py @@ -46,3 +46,8 @@ def load_model(self): Load or initialize the satellite-specific model. This method can be used to load pre-trained models. """ pass + + @abstractmethod + def save_model(self, model_path: str): + """Persist model weights or parameters.""" + pass From 56c8e125c2f18c65da3503747e34233f49da2a08 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Sat, 5 Apr 2025 20:13:38 -0400 Subject: [PATCH 3/9] standardized single sat detect --- .../anomaly_manager.py | 5 +- .../models/satellite/outlier_detection.py | 81 ++++++++++++------- .../satellite_anomaly_detection_model.py | 10 ++- 3 files changed, 60 insertions(+), 36 deletions(-) diff --git a/anomaly-detection-container/anomaly_manager.py b/anomaly-detection-container/anomaly_manager.py index 1b32f2d..8d9e772 100644 --- a/anomaly-detection-container/anomaly_manager.py +++ b/anomaly-detection-container/anomaly_manager.py @@ -164,7 +164,10 @@ def process_data(self, data): try: logging.info(f"Processing satellite-specific anomalies for satellite {satellite_id} with model {model_name}") logging.info(f"Data: {data}") - is_anomaly, details = model.detect(data) + timestep = data.get('time', 0) + channel_data = data.get('data', {}) + satellite_id = data.get('satellite_id', 0) + is_anomaly, details = model.detect(timestep, satellite_id, channel_data) if is_anomaly: self.record_anomaly(details, True) except Exception as e: diff --git a/anomaly-detection-container/models/satellite/outlier_detection.py b/anomaly-detection-container/models/satellite/outlier_detection.py index 062e2ac..fec90ce 100644 --- a/anomaly-detection-container/models/satellite/outlier_detection.py +++ b/anomaly-detection-container/models/satellite/outlier_detection.py @@ -28,41 +28,45 @@ def load_model(self): """ pass - def detect(self, data): + def detect(self, time, satellite_id, data): """ Detect outliers in the incoming data. Parameters: - data (dict): A dictionary containing telemetry data. Data format: - { - 'time': number, - 'satellite_id': number, - 'data': dict - } + time (int): The timestamp of the data point. + satellite_id (int): The ID of the satellite. + data (dict): A dictionary containing telemetry data fields and their values. Returns: bool: True if an anomaly is detected, False otherwise. - AnomalyDetails: Details of the anomaly if detected. Otherwise, return None. + list[AnomalyDetails]: Details of the anomaly (or multiple) if detected. Otherwise, return None. """ logging.info(f'Outlier detection model reports: {self.metric} - {self.threshold}') + self.satellite_id = satellite_id # Store satellite_id for use in detect_field + if self.metric is not None and not isinstance(self.metric, list): - data = self.detect_field(data['time'], self.metric, data['data']) - if data is not None: - return True, data + anomaly_data = self.detect_field(time, self.metric, data) + if anomaly_data is not None: + return True, anomaly_data else: return False, None elif isinstance(self.metric, list): for field in self.metric: logging.info(f"--------------------------------- {field}") - anomaly_data = self.detect_field(data['time'], field, data['data']) + anomaly_data = self.detect_field(time, field, data) if anomaly_data is not None: return True, anomaly_data else: + anomalies = [] for field in data: - anomaly_data = self.detect_field(data['time'], field, data['data']) + anomaly_data = self.detect_field(time, field, data) if anomaly_data is not None: - return True, anomaly_data + anomalies.append(anomaly_data) + + if anomalies: + return True, anomalies + return False, None def detect_field(self, time, field, data): @@ -70,54 +74,69 @@ def detect_field(self, time, field, data): Detect outliers in a specific field of the incoming data. Parameters: + time (int): The timestamp of the data point. field (str): The field to check for outliers. - data (dict): A dictionary containing the payload. This is the 'data' field in the whole telemetry packet. + data (dict): A dictionary containing the payload fields and their values. Returns: AnomalyDetails: Details of the anomaly if detected. Otherwise, return None """ field_name = field - field = data.get(field, None) - logging.info(f'YEEEEEEEEEEEEEEE: {field} - {self.means} - {self.variances} - {self.counts}') - if field is None: + field_value = data.get(field, None) + logging.info(f'Analyzing field: {field_name} - Value: {field_value}') + + if field_value is None: + return None + + # Skip non-numeric values + if not isinstance(field_value, (int, float)): return None - # keep running mean and std for the field + # Initialize tracking for this field if it's the first time we see it if field_name not in self.means: self.means[field_name] = 0 self.variances[field_name] = 0 self.counts[field_name] = 0 + self.anomaly_start[field_name] = None - self.means[field_name] = (self.means[field_name] * self.counts[field_name] + field) / (self.counts[field_name] + 1) - # Welford's online algorithm for variance - self.variances[field_name] = (self.variances[field_name] * (self.counts[field_name]) + (field - self.means[field_name]) ** 2) / (self.counts[field_name] + 1) + # Update running statistics using Welford's online algorithm + self.means[field_name] = (self.means[field_name] * self.counts[field_name] + field_value) / (self.counts[field_name] + 1) + self.variances[field_name] = (self.variances[field_name] * (self.counts[field_name]) + + (field_value - self.means[field_name]) ** 2) / (self.counts[field_name] + 1) self.counts[field_name] += 1 mean = self.means[field_name] std = self.variances[field_name] ** 0.5 + + # Avoid division by zero + if std == 0: + return None - # Simple z-score based outlier detection - z_score = (field - mean) / std + # Calculate z-score + z_score = (field_value - mean) / std # if it is an anomaly if abs(z_score) > self.threshold: - - # check if anomaly already started - if self.anomaly_start[field_name] is None: + # Check if anomaly already started + if field_name not in self.anomaly_start or self.anomaly_start[field_name] is None: self.anomaly_start[field_name] = time ending_time = time else: ending_time = time time = self.anomaly_start[field_name] - anomaly_details = AnomalyDetails( - satellite_id=self.satellite_id, anomaly_model=self.__class__.__name__, time=time, - metric=field_name, value=field, time_end=ending_time, + satellite_id=self.satellite_id, + anomaly_model=self.__class__.__name__, + time=time, + metric=field_name, + value=field_value, + time_end=ending_time, message=f'{field_name} outlier detected with z-score {z_score:.2f}' ) return anomaly_details else: - self.anomaly_start[field_name] = None # reset the anomaly start time + self.anomaly_start[field_name] = None # Reset the anomaly start time + return None diff --git a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py index 29101d8..200e035 100644 --- a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py +++ b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py @@ -27,16 +27,18 @@ def __init__(self, **kwargs): self.load_model() @abstractmethod - def detect(self, data) -> tuple[bool, AnomalyDetails]: + def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: """ Process incoming data for a specific satellite and detect anomalies. Parameters: - data (dict): A dictionary containing telemetry data for a single satellite. + time (int): The time of the data point. + satellite_id (int): The ID of the satellite. + data (dict): A dictionary containing the telemetry data from any channels at this timestep. Returns: - bool: True if an anomaly is detected, False otherwise. - AnomalyDetails: Details of the anomaly if detected. Otherwise, return None. + bool: True if anomaly is detected, False otherwise. + list[AnomalyDetails]: Details of the anomaly (or multiple anomalies) if detected. Otherwise, return None. """ pass From 4e9fddae1e1ab55d3b02054375dcca8b1bb8f5fa Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Sun, 6 Apr 2025 16:46:56 -0400 Subject: [PATCH 4/9] Create model_interface_tests.py --- .../tests/model_interface_tests.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 anomaly-detection-container/tests/model_interface_tests.py diff --git a/anomaly-detection-container/tests/model_interface_tests.py b/anomaly-detection-container/tests/model_interface_tests.py new file mode 100644 index 0000000..7642574 --- /dev/null +++ b/anomaly-detection-container/tests/model_interface_tests.py @@ -0,0 +1,55 @@ +import os +import tempfile + +def run_generic_satellite_model_tests(model_factory): + """ + Run generic tests on a SatelliteAnomalyDetectionModel implementation. + model_factory should be a callable returning an instance of the model. + """ + # Create a model instance + model = model_factory() + + # --- Test the detect() method with non-anomalous fake data --- + detected, details = model.detect(time=100, satellite_id=1, data={"test_metric": 0}) + assert isinstance(detected, bool), "detect() should return a bool as first element" + assert isinstance(details, list), "detect() should return a list as second element" + + # --- Test the detect() method with clearly anomalous fake data --- + # Note: The model’s own logic will decide whether an anomaly is flagged. + detected, details = model.detect(time=101, satellite_id=1, data={"test_metric": 9999}) + # If an anomaly is flagged, the details should be a list of objects with the expected attributes. + if detected: + for anomaly in details: + for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: + assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" + + # --- Test save_model() --- + with tempfile.TemporaryDirectory() as tmpdirname: + model_path = os.path.join(tmpdirname, "model_state.txt") + model.save_model(model_path) + assert os.path.exists(model_path), "save_model() should create a file at the given path" + +def run_generic_constellation_model_tests(model_factory): + """ + Run generic tests on a ConstellationAnomalyDetectionModel implementation. + model_factory should be a callable returning an instance of the model. + """ + model = model_factory() + + # --- Test the detect() method with non-anomalous fake data --- + detected, details = model.detect(time=200, satellite_id=2, data={"constellation_metric": 0}) + assert isinstance(detected, bool), "detect() should return a bool as first element" + assert isinstance(details, list), "detect() should return a list as second element" + + # --- Test the detect() method with clearly anomalous fake data --- + detected, details = model.detect(time=201, satellite_id=2, data={"constellation_metric": 9999}) + if detected: + for anomaly in details: + for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: + assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" + + # --- Test save_model() --- + with tempfile.TemporaryDirectory() as tmpdirname: + model_path = os.path.join(tmpdirname, "constellation_model_state.txt") + model.save_model(model_path) + assert os.path.exists(model_path), "save_model() should create a file at the given path" From bebd50d4c4270e50b52d690755908b2e38b74735 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Mon, 7 Apr 2025 12:11:07 -0400 Subject: [PATCH 5/9] added interface tests --- .../constellation_anomaly_detection_model.py | 2 +- .../constellation_outlier_detection.py | 6 +- .../models/satellite/outlier_detection.py | 4 ++ .../satellite_anomaly_detection_model.py | 2 +- .../tests/all_tests.py | 35 +++++++++ .../tests/interface_tests.py | 71 +++++++++++++++++++ .../tests/model_interface_tests.py | 55 -------------- 7 files changed, 117 insertions(+), 58 deletions(-) create mode 100644 anomaly-detection-container/tests/all_tests.py create mode 100644 anomaly-detection-container/tests/interface_tests.py delete mode 100644 anomaly-detection-container/tests/model_interface_tests.py diff --git a/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py b/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py index fb3a060..983ac05 100644 --- a/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py +++ b/anomaly-detection-container/models/constellation/constellation_anomaly_detection_model.py @@ -19,7 +19,7 @@ class ConstellationAnomalyDetectionModel(ABC): """ @abstractmethod - def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: + def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails] | None]: """ Process incoming data and detect anomalies at the constellation level. diff --git a/anomaly-detection-container/models/constellation/constellation_outlier_detection.py b/anomaly-detection-container/models/constellation/constellation_outlier_detection.py index 440f6fd..e3f0111 100644 --- a/anomaly-detection-container/models/constellation/constellation_outlier_detection.py +++ b/anomaly-detection-container/models/constellation/constellation_outlier_detection.py @@ -14,7 +14,7 @@ def __init__(self, std_threshold=2, window_seconds=30): """ self.std_threshold = std_threshold self.window = window_seconds - + # A dictionary to store history of readings for each channel. # Keys are channel names; values are lists of tuples (time, value). self.channel_history = {} @@ -92,3 +92,7 @@ def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: if anomalies: return True, anomalies return False, [] + + + def save_model(self, model_path: str): + pass \ No newline at end of file diff --git a/anomaly-detection-container/models/satellite/outlier_detection.py b/anomaly-detection-container/models/satellite/outlier_detection.py index fec90ce..d421787 100644 --- a/anomaly-detection-container/models/satellite/outlier_detection.py +++ b/anomaly-detection-container/models/satellite/outlier_detection.py @@ -140,3 +140,7 @@ def detect_field(self, time, field, data): self.anomaly_start[field_name] = None # Reset the anomaly start time return None + + + def save_model(self, model_path: str): + pass diff --git a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py index 200e035..356105a 100644 --- a/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py +++ b/anomaly-detection-container/models/satellite/satellite_anomaly_detection_model.py @@ -27,7 +27,7 @@ def __init__(self, **kwargs): self.load_model() @abstractmethod - def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails]]: + def detect(self, time, satellite_id, data) -> tuple[bool, list[AnomalyDetails] | None]: """ Process incoming data for a specific satellite and detect anomalies. diff --git a/anomaly-detection-container/tests/all_tests.py b/anomaly-detection-container/tests/all_tests.py new file mode 100644 index 0000000..5872b76 --- /dev/null +++ b/anomaly-detection-container/tests/all_tests.py @@ -0,0 +1,35 @@ +from interface_tests import ( + run_generic_satellite_model_tests, + run_generic_constellation_model_tests, +) + +import os +import sys +# === IMPORT MODELS BELOW === +print(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from models.satellite.outlier_detection import OutlierDetectionModel +from models.constellation.constellation_outlier_detection import ConstellationOutlierDetection + +# === FACTORY FUNCTIONS FOR EACH MODEL === + +def sat_outlier_factory(): + return OutlierDetectionModel() + +def const_outlier_factory(): + return ConstellationOutlierDetection() + +# === RUN TESTS FOR EACH MODEL === + +def test_outlier_model(): + run_generic_satellite_model_tests(sat_outlier_factory) + +def test_system_wide_model(): + run_generic_constellation_model_tests(const_outlier_factory) + + +# Optionally, if you want to be able to run this file standalone: +if __name__ == "__main__": + test_outlier_model() + test_system_wide_model() + print("All tests passed.") diff --git a/anomaly-detection-container/tests/interface_tests.py b/anomaly-detection-container/tests/interface_tests.py new file mode 100644 index 0000000..2c2f4c4 --- /dev/null +++ b/anomaly-detection-container/tests/interface_tests.py @@ -0,0 +1,71 @@ +import os +import tempfile + +def run_generic_satellite_model_tests(model_factory): + """ + Run generic tests on a SatelliteAnomalyDetectionModel implementation. + model_factory should be a callable returning an instance of the model. + """ + # Create a model instance + model = model_factory() + + # --- Test the detect() method with non-anomalous fake data --- + detected, details = model.detect(time=100, satellite_id=1, data={"test_metric": 0}) + assert isinstance(detected, bool), "detect() should return a bool as first element" + assert isinstance(details, list) or details is None, "detect() should return a list or None as second element" + + # --- Test the detect() method with clearly anomalous fake data --- + # Note: The model’s own logic will decide whether an anomaly is flagged. + detected, details = model.detect(time=101, satellite_id=1, data={"test_metric": 1}) + detected, details = model.detect(time=102, satellite_id=1, data={"test_metric": 2}) + detected, details = model.detect(time=103, satellite_id=1, data={"test_metric": 0}) + detected, details = model.detect(time=104, satellite_id=1, data={"test_metric": 1}) + detected, details = model.detect(time=105, satellite_id=1, data={"test_metric": 0}) + detected, details = model.detect(time=106, satellite_id=1, data={"test_metric": 999999}) + # If an anomaly is flagged, the details should be a list of objects with the expected attributes. + if detected: + for anomaly in details: + for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: + assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" + + # --- Test save_model() --- + # with tempfile.TemporaryDirectory() as tmpdirname: + # model_path = os.path.join(tmpdirname, "model_state.txt") + # model.save_model(model_path) + # assert os.path.exists(model_path), "save_model() should create a file at the given path" + +def run_generic_constellation_model_tests(model_factory): + """ + Run generic tests on a ConstellationAnomalyDetectionModel implementation. + model_factory should be a callable returning an instance of the model. + """ + model = model_factory() + + # --- Test the detect() method with non-anomalous fake data --- + detected, details = model.detect(time=200, satellite_id=2, data={"constellation_metric": 0}) + assert isinstance(detected, bool), "detect() should return a bool as first element" + assert isinstance(details, list) or details is None, "detect() should return a list or None as second element" + + # --- Test the detect() method with clearly anomalous fake data --- + detected, details = model.detect(time=101, satellite_id=1, data={"test_metric": 1}) + detected, details = model.detect(time=101, satellite_id=2, data={"test_metric": 1}) + detected, details = model.detect(time=102, satellite_id=1, data={"test_metric": 2}) + detected, details = model.detect(time=102, satellite_id=2, data={"test_metric": 2}) + detected, details = model.detect(time=103, satellite_id=1, data={"test_metric": 0}) + detected, details = model.detect(time=103, satellite_id=2, data={"test_metric": 0}) + detected, details = model.detect(time=104, satellite_id=1, data={"test_metric": 1}) + detected, details = model.detect(time=104, satellite_id=2, data={"test_metric": 1}) + detected, details = model.detect(time=105, satellite_id=1, data={"test_metric": 0}) + detected, details = model.detect(time=105, satellite_id=2, data={"test_metric": 0}) + detected, details = model.detect(time=106, satellite_id=1, data={"test_metric": 2}) + detected, details = model.detect(time=106, satellite_id=2, data={"test_metric": 999999}) + if detected: + for anomaly in details: + for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: + assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" + + # --- Test save_model() --- + # with tempfile.TemporaryDirectory() as tmpdirname: + # model_path = os.path.join(tmpdirname, "constellation_model_state.txt") + # model.save_model(model_path) + # assert os.path.exists(model_path), "save_model() should create a file at the given path" diff --git a/anomaly-detection-container/tests/model_interface_tests.py b/anomaly-detection-container/tests/model_interface_tests.py deleted file mode 100644 index 7642574..0000000 --- a/anomaly-detection-container/tests/model_interface_tests.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import tempfile - -def run_generic_satellite_model_tests(model_factory): - """ - Run generic tests on a SatelliteAnomalyDetectionModel implementation. - model_factory should be a callable returning an instance of the model. - """ - # Create a model instance - model = model_factory() - - # --- Test the detect() method with non-anomalous fake data --- - detected, details = model.detect(time=100, satellite_id=1, data={"test_metric": 0}) - assert isinstance(detected, bool), "detect() should return a bool as first element" - assert isinstance(details, list), "detect() should return a list as second element" - - # --- Test the detect() method with clearly anomalous fake data --- - # Note: The model’s own logic will decide whether an anomaly is flagged. - detected, details = model.detect(time=101, satellite_id=1, data={"test_metric": 9999}) - # If an anomaly is flagged, the details should be a list of objects with the expected attributes. - if detected: - for anomaly in details: - for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: - assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" - - # --- Test save_model() --- - with tempfile.TemporaryDirectory() as tmpdirname: - model_path = os.path.join(tmpdirname, "model_state.txt") - model.save_model(model_path) - assert os.path.exists(model_path), "save_model() should create a file at the given path" - -def run_generic_constellation_model_tests(model_factory): - """ - Run generic tests on a ConstellationAnomalyDetectionModel implementation. - model_factory should be a callable returning an instance of the model. - """ - model = model_factory() - - # --- Test the detect() method with non-anomalous fake data --- - detected, details = model.detect(time=200, satellite_id=2, data={"constellation_metric": 0}) - assert isinstance(detected, bool), "detect() should return a bool as first element" - assert isinstance(details, list), "detect() should return a list as second element" - - # --- Test the detect() method with clearly anomalous fake data --- - detected, details = model.detect(time=201, satellite_id=2, data={"constellation_metric": 9999}) - if detected: - for anomaly in details: - for field in ["satellite_id", "anomaly_model", "time", "time_end", "metric", "value", "message"]: - assert hasattr(anomaly, field), f"Anomaly detail missing expected field: {field}" - - # --- Test save_model() --- - with tempfile.TemporaryDirectory() as tmpdirname: - model_path = os.path.join(tmpdirname, "constellation_model_state.txt") - model.save_model(model_path) - assert os.path.exists(model_path), "save_model() should create a file at the given path" From 97d3e4673dfa2e0b45f346a24eab26a2eeee84b1 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Mon, 7 Apr 2025 14:07:52 -0400 Subject: [PATCH 6/9] added github workflows --- .github/workflows/python-app.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .github/workflows/python-app.yml diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml new file mode 100644 index 0000000..80c9a8b --- /dev/null +++ b/.github/workflows/python-app.yml @@ -0,0 +1,29 @@ +name: Run Model Interface Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Setup Python 3.10 + uses: actions/setup-python@v2 + with: + python-version: 3.10 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r anomaly-detection-container/requirements.txt + + - name: Run centralized model tests + run: | + python anomaly-detection-container/tests/run_all_model_tests.py From 14f6459a2303e3dfc8932d9b6ff7f64559b36e46 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Mon, 7 Apr 2025 14:08:42 -0400 Subject: [PATCH 7/9] fixed github action --- .github/workflows/python-app.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 80c9a8b..3399bb5 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -26,4 +26,4 @@ jobs: - name: Run centralized model tests run: | - python anomaly-detection-container/tests/run_all_model_tests.py + python anomaly-detection-container/tests/all_tests.py From 46418113cae23c4fe17befb55e74153f527418de Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Mon, 7 Apr 2025 15:03:38 -0400 Subject: [PATCH 8/9] Create data_loader.py --- .../pipelines/data_loader.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 anomaly-detection-container/pipelines/data_loader.py diff --git a/anomaly-detection-container/pipelines/data_loader.py b/anomaly-detection-container/pipelines/data_loader.py new file mode 100644 index 0000000..167389c --- /dev/null +++ b/anomaly-detection-container/pipelines/data_loader.py @@ -0,0 +1,80 @@ +# pipelines/data_loader.py + +import os +import numpy as np +import pandas as pd +import json +from typing import Union + +def load_data(file_path: str) -> Union[pd.DataFrame, np.ndarray, dict]: + """ + Loads data from a file. Supports CSV, npz, npy, and JSON formats. + Returns: + A DataFrame for CSV/JSON (if tabular), or a numpy array/dict. + """ + ext = os.path.splitext(file_path)[-1].lower() + if ext == ".csv": + return pd.read_csv(file_path) + elif ext == ".npy": + return np.load(file_path, allow_pickle=True) + elif ext == ".npz": + return np.load(file_path, allow_pickle=True) + elif ext == ".json": + with open(file_path, "r") as f: + return json.load(f) + else: + raise ValueError(f"Unsupported file type: {ext}") + +class DataLoader: + def __init__(self, data, batch_size=32, shuffle=True): + """ + A simple DataLoader that supports pandas DataFrame, numpy arrays, and lists. + """ + self.data = data + self.batch_size = batch_size + self.shuffle = shuffle + + if isinstance(data, pd.DataFrame): + self.dataset = data.reset_index(drop=True) + self.indices = list(self.dataset.index) + elif isinstance(data, np.ndarray) or isinstance(data, list): + self.dataset = data + self.indices = list(range(len(data))) + else: + raise ValueError("Unsupported data type for DataLoader.") + + def __iter__(self): + self.current_index = 0 + if self.shuffle: + np.random.shuffle(self.indices) + return self + + def __next__(self): + if self.current_index >= len(self.indices): + raise StopIteration + batch_indices = self.indices[self.current_index:self.current_index+self.batch_size] + self.current_index += self.batch_size + if isinstance(self.dataset, pd.DataFrame): + return self.dataset.iloc[batch_indices] + elif isinstance(self.dataset, np.ndarray): + return self.dataset[batch_indices] + elif isinstance(self.dataset, list): + return [self.dataset[i] for i in batch_indices] + +def split_data(data, test_ratio=0.2): + """ + Splits the data into training and testing sets. + Works with pandas DataFrame and numpy array. + """ + if isinstance(data, pd.DataFrame): + data = data.sample(frac=1).reset_index(drop=True) # Shuffle + split_idx = int(len(data) * (1 - test_ratio)) + return data.iloc[:split_idx], data.iloc[split_idx:] + elif isinstance(data, np.ndarray): + indices = np.arange(len(data)) + np.random.shuffle(indices) + split_idx = int(len(data) * (1 - test_ratio)) + train_idx, test_idx = indices[:split_idx], indices[split_idx:] + return data[train_idx], data[test_idx] + else: + raise ValueError("Unsupported data type for splitting.") From d185cdedf77b796fcc7edb3785f80d64e0c03a69 Mon Sep 17 00:00:00 2001 From: varun unnithan Date: Mon, 7 Apr 2025 15:13:43 -0400 Subject: [PATCH 9/9] fix python version in github actions --- .github/workflows/python-app.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 3399bb5..f87fef9 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -14,10 +14,10 @@ jobs: - name: Checkout repository uses: actions/checkout@v2 - - name: Setup Python 3.10 - uses: actions/setup-python@v2 + - name: Set up Python + uses: actions/setup-python@v4 with: - python-version: 3.10 + python-version: '3.12' - name: Install dependencies run: |