-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadguard-notifier.py
More file actions
69 lines (57 loc) · 2.42 KB
/
adguard-notifier.py
File metadata and controls
69 lines (57 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import requests
import json
import time
# Config
AGH_URL = "http://127.0.0.1:80/control/querylog?limit=10"
AGH_USER = "ADGUARD_USERNAME"
AGH_PASS = "ADGUARD_PASSWORD"
HA_WEBHOOK_URL = "http://YOUR_HA_IP:8123/api/webhook/..."
COOLDOWN_SECONDS = 60
TARGET_REASONS = "FilteredParental"
url_cooldowns = {}
last_seen_time = ""
reason_list = [r.strip() for r in TARGET_REASONS.split(",")]
def trigger_webhook(domain, client_display, timestamp):
payload = {
"domain": domain,
"client": client_display,
"time": timestamp
}
try:
requests.post(HA_WEBHOOK_URL, json=payload, timeout=5)
except Exception as e:
print(f"Failed to trigger webhook: {e}")
print("Monitoring AdGuard")
while True:
try:
response = requests.get(AGH_URL, auth=(AGH_USER, AGH_PASS), timeout=5)
if response.status_code == 200:
data = response.json()
queries = data.get('data', [])
if queries:
current_time_str = queries[0].get('time')
if current_time_str != last_seen_time:
for q in queries:
if q.get('time') == last_seen_time:
break
if q.get('reason') in reason_list:
ip_addr = q.get('client', 'Unknown IP')
domain = q.get('question', {}).get('name', '')
now = time.time()
# CREATE UNIQUE KEY (Client + Domain)
cooldown_key = f"{ip_addr}:{domain}"
last_notify = url_cooldowns.get(cooldown_key, 0)
if (now - last_notify) > COOLDOWN_SECONDS:
friendly_name = q.get('client_info', {}).get('name', '')
client_display = f"{friendly_name} ({ip_addr})" if friendly_name else ip_addr
timestamp = q.get('time').split('.')[0].replace('T', ' ')
print(f"[{timestamp}] ALERT: {client_display} -> {domain}")
trigger_webhook(domain, client_display, timestamp)
url_cooldowns[cooldown_key] = now
last_seen_time = current_time_str
time.sleep(2)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)