This repository was archived by the owner on Jul 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathExplosionDetector.py
More file actions
103 lines (92 loc) · 3.72 KB
/
ExplosionDetector.py
File metadata and controls
103 lines (92 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#
# Copyright (c) 2018, Edgewise Networks Inc. All rights reserved.
#
from sys import float_info
from collections import defaultdict
import heapq
from datetime import datetime
from math import sqrt
from OnlineDeviation import Stdev
from Netflows import Netflow
from HLL import HyperLogLog
from NetflowDetector import NetflowDetector, timestampToDatetime
from heapq import heappush, heappop, heappushpop
maxfloat = float_info.max
class ExplosionDetector(NetflowDetector):
""" Compare short-term hlls to find a (new?) short-term hll card that's
N sigmas above all the others. => something exploded in current time period
(For example, at the 5-sigma level, if we want to avoid most false alarms.)
To do this, we create an HLL for each IP address, and add (ip+port)
items to the HLL. (We also keep one for the total number of ip+port
combinations we see.) We regularly check the cardinality for each
IP address, and issue a warning when it's > 5 sigmas above
the expected number for all the HLLs.
"""
__slots__ = ('shortCardDict', 'totalCount', 'topN')
def __init__(self, sigmaCount=5, period=86400, topN=10):
super().__init__(sigmaCount, period=period)
self.topN = topN
self.shortCardDict = defaultdict(lambda: HyperLogLog(16))
self.totalCount = 0
def addNetflow(self, netflow):
srcip = netflow.getSourceIpString()
dst = netflow.getDestinationString()
self.shortCardDict[ srcip ].add( dst )
self.totalCount += 1
def getOutliersAll(self):
""" must return a dict of (key, sigmas > sigmaCount)
"""
outliers = {}
s = Stdev()
for key, hll in self.shortCardDict.items():
cnt = hll.cardinality()
s.add(cnt)
mean = s.getMean()
stdv = s.getStdev()
for key, hll in self.cardinalityDict.items():
cnt = hll.cardinality()
if cnt > mean + self.sigmaCount * stdv:
sigs = (cnt - mean) / stdv
outliers[key] = sigs
# empty short-term HLLs
for key in self.shortCardDict.keys():
self.shortCardDict[key] = HyperLogLog(16)
return outliers
def logOutput(self, key, result):
""" key: an item being tracked
result: bool -- True if starting above sigmaCount, False if ending above it.
"""
dt = timestampToDatetime(self.lastTimestamp)
if result:
print("%s ::: IP address %s became an outlier for explosion scanning." % (dt, key) )
else:
print("%s ::: IP address %s is no longer an outlier for explosion scanning." % (dt, key) )
def getOutliers(self):
""" must return a dict of (key, sigmas > sigmaCount)
"""
outliers = {}
s = Stdev()
h = []
topN = self.topN
for key, hll in self.shortCardDict.items():
shortCount = hll.cardinality()
s.add(shortCount)
if len(h) < topN:
heapq.heappush( h, (shortCount, key) )
else:
heapq.heappushpop( h, (shortCount, key) )
mean = s.getMean()
stdv = s.getStdev()
for cnt, key in h:
if cnt > mean + self.sigmaCount * stdv:
sigs = (cnt - mean) / stdv
outliers[key] = sigs
for key in self.timePeriodMap.getActives():
cnt = hll.cardinality()
if cnt > mean + self.sigmaCount * stdv:
sigs = (cnt - mean) / stdv
outliers[key] = sigs
# empty short-term HLLs
for key in self.shortCardDict.keys():
self.shortCardDict[key] = HyperLogLog(16)
return outliers