-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathids_engine.py
More file actions
81 lines (67 loc) · 3.31 KB
/
ids_engine.py
File metadata and controls
81 lines (67 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from dataset_generator import generate_dataset
class IDSEngine:
"""
The core AI engine for the Intrusion Detection System.
It uses a Random Forest classifier to identify potential threats.
"""
def __init__(self):
"""
Initializes the IDSEngine with a RandomForestClassifier.
"""
# Using RandomForest as it's robust and good for this kind of classification.
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
def train_model(self, dataset_path=None, test_size=0.2):
"""
Trains the AI model on a given dataset.
If no dataset is provided, it generates a sample one.
Args:
dataset_path (str, optional): Path to the training dataset (CSV).
test_size (float): The proportion of the dataset to include in the test split.
"""
if dataset_path:
# In a real-world scenario, you'd use a well-known dataset
# like CIC-IDS2017 or UNSW-NB15.
df = pd.read_csv(dataset_path)
else:
print("No dataset provided. Generating a sample dataset for demonstration.")
df = generate_dataset(num_samples=1000)
# Ensure all columns are numeric for the model
# For simplicity, we drop non-numeric columns. In a real scenario, you'd encode them.
df_numeric = df.select_dtypes(include=['number'])
if 'label' not in df_numeric.columns:
raise ValueError("Dataset must contain a 'label' column for training.")
X = df_numeric.drop('label', axis=1)
y = df_numeric['label']
# Split data for training and testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
print(f"Training model with {len(X_train)} samples.")
self.model.fit(X_train, y_train)
# Evaluate the model
accuracy = self.model.score(X_test, y_test)
print(f"Model trained. Accuracy on test data: {accuracy:.2f}")
def predict(self, features_df):
"""
Predicts whether a set of packets (as a DataFrame) contains a threat.
Args:
features_df (pd.DataFrame): DataFrame of features extracted from packets.
Returns:
A numpy array of predictions (0 for normal, 1 for threat).
"""
if not hasattr(self.model, 'classes_'):
raise RuntimeError("Model has not been trained yet. Call train_model() first.")
# Ensure the columns in the input data match the training data
# This is a simplified approach. A more robust solution would involve
# saving column order during training.
training_cols = self.model.feature_names_in_
# Align columns
aligned_df = pd.DataFrame(columns=training_cols)
for col in training_cols:
if col in features_df.columns:
aligned_df[col] = features_df[col]
else:
aligned_df[col] = 0 # Fill missing columns with 0
aligned_df = aligned_df[training_cols] # Ensure correct order
return self.model.predict(aligned_df)