-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathHierarchy.py
More file actions
76 lines (66 loc) · 2.44 KB
/
Hierarchy.py
File metadata and controls
76 lines (66 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import yaml
from scapy.all import sniff
from settings.telegram import send_to_telegram
import signal
from tcp.http_detect import HTTPAnalyzer
from tcp.TLS import analyzer_tls
import os
from settings.signal_module import signal_handler , cleanup_iptables
from geo.GeoIP import handle_packet
import ipaddress
from geo.GeoSite import analyze_goesite
from tcp.ssh import analyze_ssh
def packet_callback (packet) :
for rule in rules['rules']:
type = rule['type']
if type == 'http':
analyzer = HTTPAnalyzer(rule)
analyzer.analyze_packet(packet=packet , rule=rule , configs=configs)
elif type == 'tls':
analyzer_tls(packet=packet , rule=rule , configs=configs)
elif type == 'geosite' :
analyze_goesite(packet=packet , rule=rule , configs=configs , domains=domains)
elif type == 'geoip' :
transport = rule['transport']
handle_packet(packet=packet , rule=rule , configs=configs , ips=ips , transport=transport)
elif type == 'ssh' :
analyze_ssh(packet=packet , rule=rule , configs=configs)
else:
print(f"Unknown rule type: {rule['type']}")
def load_yaml(file_path):
with open(file_path, 'r') as file:
return yaml.safe_load(file)
if __name__ == "__main__" :
configs = load_yaml('./settings/config.yaml')
interface = configs['io']['interface']
rules = load_yaml('./settings/rules.yaml')
#geoip settings
def list_ips(ip_range):
network = ipaddress.ip_network(ip_range)
return [str(ip) for ip in network.hosts()]
path_geoip = configs['path']['geoip']
path_geosite = configs['path']['geosite']
ip_ranges = []
ips = set()
with open (path_geoip , "r") as f:
for line in f :
range = line.strip()
ip_ranges.append(range)
for ip_range in ip_ranges:
temp = []
temp = list_ips(ip_range)
ips.update(temp)
iptable_rules = []
#Geosite settings
domains = {}
with open (path_geosite , "r") as f:
for line in f :
domain = line.strip()
domains[domain] = []
print(f"Starting packet capture on interface {interface}...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if interface == "all" :
sniff(prn=packet_callback, store=0)
else :
sniff(iface=interface , prn=packet_callback, store=0)