-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_support.py
More file actions
46 lines (36 loc) · 1.64 KB
/
server_support.py
File metadata and controls
46 lines (36 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import datetime
import socketserver
import threading
from typing import Any
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
class AdminGuard:
WINDOW_SECONDS = 300
MAX_FAILED_IN_WINDOW = 5
def __init__(self, database: Any):
self.database = database
self._lock = threading.Lock()
def register_failure(self, ip: str) -> None:
now_dt = datetime.datetime.now(datetime.timezone.utc)
with self._lock:
fail_count, first_fail = self.database.get_admin_fail_state(ip)
if first_fail is None or (now_dt - first_fail).total_seconds() > self.WINDOW_SECONDS:
self.database.set_admin_fail_state(ip, 1, now_dt)
return
self.database.set_admin_fail_state(ip, fail_count + 1, first_fail)
def register_success(self, ip: str) -> None:
with self._lock:
self.database.clear_admin_fail_state(ip)
def can_attempt(self, ip: str) -> tuple[bool, int]:
now_dt = datetime.datetime.now(datetime.timezone.utc)
with self._lock:
fail_count, first_fail = self.database.get_admin_fail_state(ip)
if fail_count < self.MAX_FAILED_IN_WINDOW or first_fail is None:
return True, 0
elapsed = (now_dt - first_fail).total_seconds()
if elapsed > self.WINDOW_SECONDS:
self.database.clear_admin_fail_state(ip)
return True, 0
retry_after = int(self.WINDOW_SECONDS - elapsed)
return False, max(retry_after, 1)