-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalytics_engine.py
More file actions
180 lines (157 loc) · 7.49 KB
/
analytics_engine.py
File metadata and controls
180 lines (157 loc) · 7.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import datetime
from collections import Counter
from typing import Any, Callable
class SecurityAnalyzer:
def __init__(
self,
database: Any,
target_users: dict[str, str],
filter_rows_by_time: Callable[[list[Any], datetime.datetime | None, datetime.datetime | None], list[Any]],
parse_training_auth_endpoint: Callable[[str], tuple[str | None, str | None]],
department_for_user: Callable[[str], str],
safe_int: Callable[[str | None, int], int],
parse_iso: Callable[[str], datetime.datetime | None],
risk_band: Callable[[float], str],
):
self.database = database
self.target_users = target_users
self.filter_rows_by_time = filter_rows_by_time
self.parse_training_auth_endpoint = parse_training_auth_endpoint
self.department_for_user = department_for_user
self.safe_int = safe_int
self.parse_iso = parse_iso
self.risk_band = risk_band
def compute(
self,
start_dt: datetime.datetime | None = None,
end_dt: datetime.datetime | None = None,
) -> dict:
all_clicks = self.database.get_clicks()
all_logins = self.database.get_logins()
clicks = self.filter_rows_by_time(all_clicks, start_dt, end_dt)
logins = self.filter_rows_by_time(all_logins, start_dt, end_dt)
clicked_users = {row["user_id"] for row in clicks if row["user_id"]}
compromised_users = {row["user_id"] for row in logins if row["user_id"]}
total_targets = max(len(self.target_users), 1)
click_rate = (len(clicked_users) / total_targets) * 100.0
compromise_rate = (len(compromised_users) / total_targets) * 100.0
top_ips = Counter(row["ip"] for row in clicks if row["ip"]).most_common(5)
anomalies = []
for ip, count in Counter(row["ip"] for row in clicks if row["ip"]).items():
if count >= 8:
anomalies.append(f"IP con volume anomalo di click: {ip} ({count})")
by_user = Counter(row["user_id"] for row in logins if row["user_id"])
for user_id, count in by_user.items():
if count >= 3:
anomalies.append(f"Utente con molti tentativi login: {user_id} ({count})")
clicks_by_ip = {}
for row in clicks:
ip = row["ip"]
ts = self.parse_iso(row["time"])
if not ip or ts is None:
continue
clicks_by_ip.setdefault(ip, []).append(ts)
for ip, timestamps in clicks_by_ip.items():
timestamps.sort()
for idx in range(3, len(timestamps)):
if (timestamps[idx] - timestamps[idx - 3]).total_seconds() <= 30:
anomalies.append(f"Pattern burst sospetto da {ip} (>=4 click in 30s)")
break
training_actions = []
user_training_stats: dict[str, dict[str, int]] = {}
department_training_stats: dict[str, dict[str, int]] = {}
for row in clicks:
endpoint = str(row["endpoint"])
scenario, action = self.parse_training_auth_endpoint(endpoint)
if scenario is None or action is None:
continue
user_id = str(row["user_id"] or "unknown")
department = self.department_for_user(user_id)
trusted = self.safe_int(str(row["trusted"]), 0) == 1
training_actions.append(
{
"user_id": user_id,
"department": department,
"scenario": scenario,
"action": action,
"trusted": trusted,
"time": str(row["time"]),
}
)
user_row = user_training_stats.setdefault(user_id, {"total": 0, "report": 0, "continue": 0})
user_row["total"] += 1
user_row[action] += 1
dep_row = department_training_stats.setdefault(department, {"total": 0, "report": 0, "continue": 0})
dep_row["total"] += 1
dep_row[action] += 1
training_user_risk = []
for user_id, stats in user_training_stats.items():
total = max(stats["total"], 1)
score = round((stats["continue"] / total) * 100.0, 2)
training_user_risk.append(
{
"user_id": user_id,
"department": self.department_for_user(user_id),
"total": stats["total"],
"report": stats["report"],
"continue": stats["continue"],
"risk_score": score,
"risk_band": self.risk_band(score),
}
)
training_department_risk = []
for department, stats in department_training_stats.items():
total = max(stats["total"], 1)
score = round((stats["continue"] / total) * 100.0, 2)
training_department_risk.append(
{
"department": department,
"total": stats["total"],
"report": stats["report"],
"continue": stats["continue"],
"risk_score": score,
"risk_band": self.risk_band(score),
}
)
training_user_risk.sort(key=lambda item: (-item["risk_score"], item["user_id"]))
training_department_risk.sort(key=lambda item: (-item["risk_score"], item["department"]))
continue_actions = sum(item["continue"] for item in training_department_risk)
training_continue_rate = (
(continue_actions / max(len(training_actions), 1)) * 100.0
if training_actions
else 0.0
)
return {
"total_click_events": len(clicks),
"total_login_events": len(logins),
"unique_clicked_users": len(clicked_users),
"unique_compromised_users": len(compromised_users),
"click_rate_percent": round(click_rate, 2),
"compromise_rate_percent": round(compromise_rate, 2),
"top_ips": top_ips,
"anomalies": anomalies,
"training_total_actions": len(training_actions),
"training_continue_rate_percent": round(training_continue_rate, 2),
"training_user_risk": training_user_risk,
"training_department_risk": training_department_risk,
"training_last_actions": training_actions[:50],
}
def run(self) -> None:
result = self.compute()
print("\n===== SECURITY ANALYSIS =====\n")
print(f"Click events: {result['total_click_events']}")
print(f"Login events: {result['total_login_events']}")
print(f"Utenti che hanno cliccato: {result['unique_clicked_users']}/{len(self.target_users)}")
print(f"Utenti compromessi: {result['unique_compromised_users']}/{len(self.target_users)}")
print(f"Click rate: {result['click_rate_percent']}%")
print(f"Compromise rate: {result['compromise_rate_percent']}%")
print(f"Training actions: {result['training_total_actions']}")
print("\nTop IP")
for ip, count in result["top_ips"]:
print(f"{ip} -> {count}")
print("\nAnomalie")
if not result["anomalies"]:
print("Nessuna anomalia evidente")
else:
for item in result["anomalies"]:
print(f"- {item}")