-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfeature_extractor.py
More file actions
64 lines (56 loc) · 2.42 KB
/
feature_extractor.py
File metadata and controls
64 lines (56 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from scapy.all import IP, TCP, UDP, ICMP
import pandas as pd
class FeatureExtractor:
"""
Extracts relevant features from network packets for the IDS models.
"""
@staticmethod
def extract_features(packets):
"""
Processes a list of Scapy packets and extracts features.
Args:
packets (list): A list of Scapy packet objects.
Returns:
A pandas DataFrame with the extracted features.
"""
feature_list = []
for packet in packets:
features = {
'protocol': 0,
'src_port': 0,
'dst_port': 0,
'length': len(packet),
'flags_fin': 0,
'flags_syn': 0,
'flags_rst': 0,
'flags_ack': 0
}
if packet.haslayer(IP):
ip_layer = packet.getlayer(IP)
features['src_ip'] = ip_layer.src
features['dst_ip'] = ip_layer.dst
if packet.haslayer(TCP):
tcp_layer = packet.getlayer(TCP)
features['protocol'] = 6 # TCP protocol number
features['src_port'] = tcp_layer.sport
features['dst_port'] = tcp_layer.dport
# Extract TCP flags
features['flags_fin'] = 1 if 'F' in str(tcp_layer.flags) else 0
features['flags_syn'] = 1 if 'S' in str(tcp_layer.flags) else 0
features['flags_rst'] = 1 if 'R' in str(tcp_layer.flags) else 0
features['flags_ack'] = 1 if 'A' in str(tcp_layer.flags) else 0
elif packet.haslayer(UDP):
udp_layer = packet.getlayer(UDP)
features['protocol'] = 17 # UDP protocol number
features['src_port'] = udp_layer.sport
features['dst_port'] = udp_layer.dport
elif packet.haslayer(ICMP):
features['protocol'] = 1 # ICMP protocol number
if 'src_ip' in features: # Only add packets with an IP layer
feature_list.append(features)
if not feature_list:
return None
df = pd.DataFrame(feature_list)
# Drop non-numeric columns for the model
df = df.drop(columns=['src_ip', 'dst_ip'], errors='ignore')
return df