-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemon.py
More file actions
1232 lines (976 loc) · 40.9 KB
/
memon.py
File metadata and controls
1232 lines (976 loc) · 40.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# mm_meta:
# name: MeMon
# emoji: 🌐
# language: Python
__version__ = "1.1.3"
"""
MeMon Network Health Monitor for MeshMonitor
Monitors router and DNS health, outputs JSON alerts only when notifications should fire.
Implements failure streak tracking with backoff logic.
"""
import json
import os
import sys
import socket
import struct
import time
import traceback
import ssl
import urllib.request
import urllib.error
from typing import Dict, List, Optional, Tuple, Any
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
# Configure stdout/stderr for UTF-8 support
if hasattr(sys.stdout, 'reconfigure') and sys.stdout.encoding != 'utf-8':
try:
sys.stdout.reconfigure(encoding='utf-8')
except (AttributeError, ValueError):
pass # Fallback gracefully if reconfigure fails
if hasattr(sys.stderr, 'reconfigure') and sys.stderr.encoding != 'utf-8':
try:
sys.stderr.reconfigure(encoding='utf-8')
except (AttributeError, ValueError):
pass # Fallback gracefully if reconfigure fails
# Default configuration values
DEFAULT_CONFIG = {
"timeoutMs": 2500,
"mustFailCount": 3,
"alertBackoffSeconds": 900,
"debug": False,
"messages": {
"routerDown": "Router is down",
"ispDown": "All DNS resolvers failed - ISP may be down",
"upstreamDnsDown": "DNS resolvers failed: {{failed}}",
"recovery": "Network connectivity restored"
},
"routerCheck": {
"method": "https",
"host": "192.168.1.1",
"port": 443,
"insecureTls": False
},
"dnsChecks": []
}
# Default state values
DEFAULT_STATE = {
"failStreak": 0,
"downNotified": False,
"lastAlertTs": 0,
"lastStatus": None,
"lastFailedDns": []
}
# MeshMonitor hard timeout limit (seconds)
MESHMONITOR_TIMEOUT = 10
# Safety margin for timeout calculations (seconds)
TIMEOUT_SAFETY_MARGIN = 0.5
# Maximum alert message length (characters)
MAX_MESSAGE_LENGTH = 200
# Default port numbers
DEFAULT_HTTP_PORT = 80
DEFAULT_HTTPS_PORT = 443
DEFAULT_DNS_PORT = 53
# Maximum UDP DNS response size (RFC 1035 standard, without EDNS)
DNS_UDP_MAX_SIZE = 512
def _get_script_dir() -> str:
"""
Get the directory where this script is located.
Returns:
Absolute path to the script's directory
"""
return os.path.dirname(os.path.abspath(__file__))
# Script directory for resolving relative paths
SCRIPT_DIR = _get_script_dir()
def _debug_log(tag: str, message: str, debug: bool) -> None:
"""Print a debug message to stderr if debug mode is enabled."""
if debug:
print(f"[{tag}] {message}", file=sys.stderr)
def _ms_to_seconds(ms: int) -> float:
"""
Convert milliseconds to seconds.
Args:
ms: Time in milliseconds
Returns:
Time in seconds as float
"""
return ms / 1000.0
def _get_default_port(method: str) -> int:
"""
Get default port number for router check method.
Args:
method: Router check method ("https", "http", or "tcp")
Returns:
Default port number for the method
"""
method_lower = method.lower()
if method_lower == "https":
return DEFAULT_HTTPS_PORT
elif method_lower == "http" or method_lower == "tcp":
return DEFAULT_HTTP_PORT
else:
return DEFAULT_HTTPS_PORT # Default to HTTPS port
def detect_execution_mode() -> str:
"""
Detect execution mode based on environment variables.
MeshMonitor sets MESSAGE and/or TRIGGER environment variables when
running in Auto Responder mode (user-triggered). Timer Trigger mode
(scheduled) does not set these variables.
Returns:
"auto_responder" if MESSAGE or TRIGGER env vars are present
"timer_trigger" if neither env var is present
"""
if os.environ.get("MESSAGE") or os.environ.get("TRIGGER"):
return "auto_responder"
return "timer_trigger"
def parse_auto_responder_command(message: str) -> str:
"""
Parse a command keyword from the Auto Responder MESSAGE text.
Scans the lowercased message for recognized command keywords.
Returns the first match found. If no keyword is recognized or
the message is empty, returns "help".
Args:
message: The raw MESSAGE environment variable value
Returns:
Command string: "status", "router", "dns", "version", or "help"
"""
if not message or not message.strip():
return "help"
text = message.lower()
if "status" in text or "all" in text:
return "status"
if "router" in text:
return "router"
if "dns" in text:
return "dns"
if "version" in text:
return "version"
return "help"
def _get_dns_display_name(check: Dict[str, Any], index: int) -> str:
"""
Extract display name from DNS check config or generate fallback.
Args:
check: DNS check configuration dictionary
index: Zero-based index for fallback naming
Returns:
Display name for the DNS server
"""
return check.get("name", f"DNS-{index}")
def _log_router_failure(method: str, host: str, port: int, reason: str, debug: bool) -> None:
"""
Log router check failure message if debug mode is enabled.
Args:
method: Router check method
host: Router hostname or IP
port: Port number
reason: Failure reason
debug: If True, print debug message
"""
_debug_log("Router", f"FAIL: {method} {host}:{port} - {reason}", debug)
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
"""
Load and validate configuration file with defaults.
Args:
config_path: Path to configuration JSON file. If None, uses script-relative path.
Returns:
Configuration dictionary with defaults applied
Raises:
FileNotFoundError: If config file is missing
ValueError: If config file is invalid
"""
# Resolve config path relative to script directory if not provided
if config_path is None:
config_path = os.path.join(SCRIPT_DIR, "memon.config.json")
# Check if config file exists
if not os.path.exists(config_path):
raise FileNotFoundError("Missing memon.config.json (copy memon.config.example.json to memon.config.json)")
config = DEFAULT_CONFIG.copy()
try:
with open(config_path, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Merge user config over defaults (shallow for top-level, deep for nested dicts)
config.update(user_config)
if "messages" in user_config:
config["messages"].update(user_config["messages"])
if "routerCheck" in user_config:
config["routerCheck"].update(user_config["routerCheck"])
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading config file {config_path}: {e}", file=sys.stderr)
sys.exit(1)
return config
def load_state(state_path: Optional[str] = None, debug: bool = False) -> Dict[str, Any]:
"""
Load state file or create default state if missing.
Args:
state_path: Path to state JSON file. If None, uses script-relative path.
debug: If True, print debug messages to stderr.
Returns:
State dictionary with defaults applied
"""
# Resolve state path relative to script directory if not provided
if state_path is None:
state_path = os.path.join(SCRIPT_DIR, "memon.state.json")
state = DEFAULT_STATE.copy()
if os.path.exists(state_path):
try:
with open(state_path, 'r', encoding='utf-8') as f:
user_state = json.load(f)
state.update(user_state)
# Clamp lastAlertTs if in future (prevents infinite backoff if clock jumped forward then corrected)
current_time = int(time.time())
old_ts = state.get("lastAlertTs", 0)
if old_ts > current_time:
state["lastAlertTs"] = current_time
_debug_log("State", f"Clock skew detected, clamped lastAlertTs from {old_ts} to {current_time}", debug)
except (json.JSONDecodeError, IOError):
# If state file is corrupted, use defaults
_debug_log("State", "State file corrupted, using defaults", debug)
_debug_log("State", f"Loaded: failStreak={state['failStreak']}, downNotified={state['downNotified']}, lastAlertTs={state['lastAlertTs']}", debug)
return state
def save_state(state: Dict[str, Any], state_path: Optional[str] = None, debug: bool = False) -> None:
"""
Write state to JSON file.
Args:
state: State dictionary to save
state_path: Path to state JSON file. If None, uses script-relative path.
debug: If True, print debug messages to stderr.
Raises:
SystemExit: If state file cannot be written (exits with stderr only)
"""
# Resolve state path relative to script directory if not provided
if state_path is None:
state_path = os.path.join(SCRIPT_DIR, "memon.state.json")
try:
with open(state_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2)
_debug_log("State", f"Saved: failStreak={state.get('failStreak')}, downNotified={state.get('downNotified')}", debug)
except IOError as e:
print(f"Error saving state file {state_path}: {e}", file=sys.stderr)
sys.exit(1)
def _check_router_http_request(url: str, timeout_ms: int, ssl_context: Any = None) -> bool:
"""
Perform HTTP/HTTPS request to check router connectivity.
Args:
url: URL to check
timeout_ms: Request timeout in milliseconds
ssl_context: SSL context for HTTPS requests, or None for HTTP
Returns:
True if router responds with 2xx/3xx, False otherwise
"""
try:
req = urllib.request.Request(url)
timeout_sec = _ms_to_seconds(timeout_ms)
kwargs = {"timeout": timeout_sec}
if ssl_context is not None:
kwargs["context"] = ssl_context
with urllib.request.urlopen(req, **kwargs) as response:
# Any 2xx or 3xx response is considered success
return 200 <= response.getcode() < 400
except (urllib.error.URLError, urllib.error.HTTPError, OSError, ssl.SSLError):
return False
# Catch any other unexpected exceptions to prevent script crash
except Exception:
return False
def check_router_https(url: str, insecure_tls: bool, timeout_ms: int) -> bool:
"""
Check router via HTTPS request.
Args:
url: HTTPS URL to check
insecure_tls: If True, disable TLS certificate validation
timeout_ms: Request timeout in milliseconds
Returns:
True if router responds successfully, False otherwise
"""
ssl_context = ssl.create_default_context()
if insecure_tls:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
return _check_router_http_request(url, timeout_ms, ssl_context)
def check_router_http(url: str, timeout_ms: int) -> bool:
"""
Check router via HTTP request.
Args:
url: HTTP URL to check
timeout_ms: Request timeout in milliseconds
Returns:
True if router responds successfully, False otherwise
"""
return _check_router_http_request(url, timeout_ms)
def check_router_tcp(host: str, timeout_ms: int, port: int = 80) -> bool:
"""
Check router via TCP socket connection test.
Uses TCP socket connection instead of ICMP ping to avoid requiring root privileges.
Tests connectivity by attempting to establish a TCP connection to the router.
Args:
host: Hostname or IP address to test
timeout_ms: Timeout in milliseconds per connection attempt
port: TCP port to connect to (default: 80)
Returns:
True if TCP connection succeeds, False otherwise
"""
try:
timeout_sec = _ms_to_seconds(timeout_ms)
# Attempt TCP connection
sock = socket.create_connection((host, port), timeout=timeout_sec)
sock.close()
return True
except (socket.timeout, socket.error, OSError, ValueError):
return False
# Catch any other unexpected exceptions to prevent script crash
except Exception:
return False
def check_router(router_check: Dict[str, Any], timeout_ms: int, debug: bool = False) -> bool:
"""
Perform router check (HTTPS, HTTP, or TCP socket connection).
Args:
router_check: Router check configuration
timeout_ms: Timeout in milliseconds
debug: If True, print debug messages to stderr
Returns:
True if router check passes, False otherwise
"""
method = router_check.get("method", "https").lower()
host = router_check.get("host", "192.168.1.1")
port = router_check.get("port") # None if not specified
# Use default port if not specified
if port is None:
port = _get_default_port(method)
if method == "tcp":
result = check_router_tcp(host, timeout_ms, port)
elif method == "http":
url = f"http://{host}:{port}"
result = check_router_http(url, timeout_ms)
else: # https (default)
url = f"https://{host}:{port}"
insecure_tls = router_check.get("insecureTls", False)
result = check_router_https(url, insecure_tls, timeout_ms)
if result:
_debug_log("Router", f"OK: {method} {host}:{port}", debug)
else:
_log_router_failure(method, host, port, f"{method.upper()} check failed", debug)
return result
def _encode_domain_name(domain: str) -> bytes:
"""
Encode domain name for DNS packet (length-prefixed labels, null-terminated).
Args:
domain: Domain name (e.g., "google.com")
Returns:
Encoded domain name as bytes
"""
encoded = b""
for label in domain.split("."):
if label:
encoded += struct.pack("B", len(label)) + label.encode("ascii")
encoded += b"\x00" # Null terminator
return encoded
def _build_dns_query(qname: str, rrtype: str) -> bytes:
"""
Build DNS query packet.
Args:
qname: Query name (domain to resolve)
rrtype: Record type (A or AAAA)
Returns:
DNS query packet as bytes
"""
# Generate random transaction ID
import random
transaction_id = random.randint(0, 65535)
# DNS header (12 bytes)
# ID (2 bytes), Flags (2 bytes), QDCOUNT (2 bytes), ANCOUNT (2 bytes),
# NSCOUNT (2 bytes), ARCOUNT (2 bytes)
flags = 0x0100 # Standard query, recursion desired
qdcount = 1 # One question
header = struct.pack("!HHHHHH", transaction_id, flags, qdcount, 0, 0, 0)
# Question section
qname_encoded = _encode_domain_name(qname)
# QTYPE: A=1, AAAA=28
if rrtype.upper() == "AAAA":
qtype = 28
else: # Default to A
qtype = 1
qclass = 1 # IN (Internet)
question = struct.pack("!HH", qtype, qclass)
return header + qname_encoded + question
def _parse_dns_response(data: bytes, expected_rrtype: str) -> Tuple[bool, str]:
"""
Parse DNS response packet and verify it contains expected record type.
Args:
data: DNS response packet bytes
expected_rrtype: Expected record type (A or AAAA)
Returns:
Tuple of (success: bool, error_message: str)
"""
if len(data) < 12:
return False, "Response too short (less than 12 bytes)"
try:
# Parse header
header = struct.unpack("!HHHHHH", data[0:12])
flags = header[1]
qdcount = header[2]
ancount = header[3]
# Check response code (bits 0-3 of flags byte 2)
rcode = flags & 0x000F
if rcode != 0: # NOERROR = 0, NXDOMAIN = 3, etc.
rcode_names = {0: "NOERROR", 1: "FORMERR", 2: "SERVFAIL", 3: "NXDOMAIN", 4: "NOTIMP", 5: "REFUSED"}
rcode_name = rcode_names.get(rcode, f"RCODE{rcode}")
return False, f"DNS response error: {rcode_name}"
# Check if we have answers
if ancount == 0:
return False, "No answers in DNS response"
# Skip question section to find answer section
offset = 12
# Skip QNAME
while offset < len(data) and data[offset] != 0:
if data[offset] & 0xC0 == 0xC0: # Compression pointer (RFC 1035 4.1.4) - name continues at pointed-to offset
offset += 2
break
else:
label_len = data[offset]
if label_len == 0:
break
if offset + 1 + label_len > len(data):
return False, "Invalid QNAME: label extends beyond packet"
offset += 1 + label_len
if offset < len(data) and data[offset] == 0:
offset += 1 # Skip null terminator
# Skip QTYPE and QCLASS (4 bytes)
if offset + 4 > len(data):
return False, "Invalid question section: QTYPE/QCLASS missing"
offset += 4
# Parse answer section
expected_type = 28 if expected_rrtype.upper() == "AAAA" else 1
found_match = False
for _ in range(ancount):
if offset >= len(data):
return False, "Answer section extends beyond packet"
# Skip NAME (may be compressed)
if offset < len(data) and data[offset] & 0xC0 == 0xC0:
offset += 2 # Compression pointer
else:
# Skip uncompressed name
while offset < len(data) and data[offset] != 0:
label_len = data[offset]
if label_len == 0:
break
if offset + 1 + label_len > len(data):
return False, "Invalid answer NAME: label extends beyond packet"
offset += 1 + label_len
if offset < len(data):
offset += 1 # Skip null terminator
if offset + 10 > len(data):
return False, "Answer record header incomplete"
# Parse answer record: TYPE (2), CLASS (2), TTL (4), RDLENGTH (2)
answer_header = struct.unpack("!HHIH", data[offset:offset+10])
answer_type = answer_header[0]
rdlength = answer_header[3] # RDLENGTH is the 4th element (index 3)
offset += 10
# Check if this answer matches expected type
if answer_type == expected_type:
# Verify RDATA length matches expected type
if expected_type == 1: # A record
if rdlength == 4: # IPv4 is 4 bytes
found_match = True
break
elif expected_type == 28: # AAAA record
if rdlength == 16: # IPv6 is 16 bytes
found_match = True
break
# Skip RDATA
if offset + rdlength > len(data):
return False, "Answer RDATA extends beyond packet"
offset += rdlength
if not found_match:
return False, f"No {expected_rrtype} record found in response"
return True, ""
except (struct.error, IndexError) as e:
return False, f"DNS parsing error: {str(e)}"
def check_dns(server: str, qname: str, rrtype: str, timeout_ms: int) -> Tuple[bool, str]:
"""
Check single DNS resolver using standard library socket.
Args:
server: DNS server IP address
qname: Query name (domain to resolve)
rrtype: Record type (A or AAAA)
timeout_ms: Timeout in milliseconds
Returns:
Tuple of (success: bool, error_message: str)
"""
try:
timeout_sec = _ms_to_seconds(timeout_ms)
# Build DNS query packet
query = _build_dns_query(qname, rrtype)
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout_sec)
try:
# Send query to DNS server on default DNS port
sock.sendto(query, (server, DEFAULT_DNS_PORT))
# Receive response
data, _ = sock.recvfrom(DNS_UDP_MAX_SIZE)
# Parse and validate response
success, error_msg = _parse_dns_response(data, rrtype)
if not success:
return False, error_msg
return True, ""
finally:
sock.close()
except socket.timeout:
return False, "Timeout waiting for DNS response"
except (socket.error, OSError) as e:
return False, f"Socket error: {str(e)}"
except (ValueError, struct.error) as e:
return False, f"Protocol error: {str(e)}"
# Catch any other unexpected exceptions to prevent script crash
except Exception as e:
return False, f"Unexpected error: {str(e)}"
def check_all_dns(dns_checks: List[Dict[str, Any]], timeout_ms: int, max_total_time: float, debug: bool = False) -> Tuple[List[str], List[str]]:
"""
Check all DNS resolvers in parallel (with timeout protection).
Args:
dns_checks: List of DNS check configurations
timeout_ms: Timeout per DNS check in milliseconds
max_total_time: Maximum total time allowed (seconds)
debug: If True, print debug messages to stderr
Returns:
Tuple of (failed_names: List[str], all_names: List[str])
"""
if not dns_checks:
return [], []
failed_names = []
all_names = [check.get("name", f"DNS-{i}") for i, check in enumerate(dns_checks)]
start_time = time.time()
# Use ThreadPoolExecutor for parallel DNS checks
with ThreadPoolExecutor(max_workers=len(dns_checks)) as executor:
futures = {}
for check in dns_checks:
server = check.get("server", "8.8.8.8")
qname = check.get("qname", "google.com")
rrtype = check.get("rrtype", "A")
name = check.get("name", "Unknown")
future = executor.submit(check_dns, server, qname, rrtype, timeout_ms)
futures[future] = (name, server, qname)
# Wait for all with overall timeout protection
for future in futures:
elapsed = time.time() - start_time
remaining_time = max_total_time - elapsed
name, server, qname = futures[future]
if remaining_time <= 0:
# Out of time, mark remaining as failed
failed_names.append(name)
_debug_log("DNS", f"FAIL: {name} ({server}) querying {qname} - Timeout (out of time)", debug)
continue
try:
success, error_msg = future.result(timeout=min(remaining_time, _ms_to_seconds(timeout_ms) + 0.5))
if not success:
failed_names.append(name)
_debug_log("DNS", f"FAIL: {name} ({server}) querying {qname} - {error_msg}", debug)
else:
_debug_log("DNS", f"OK: {name} ({server}) querying {qname}", debug)
except FutureTimeoutError:
failed_names.append(name)
_debug_log("DNS", f"FAIL: {name} ({server}) querying {qname} - Timeout waiting for response", debug)
except Exception as e:
failed_names.append(name)
_debug_log("DNS", f"FAIL: {name} ({server}) querying {qname} - Exception: {str(e)}", debug)
return failed_names, all_names
def classify_status(router_ok: bool, failed_dns: List[str], all_dns: List[str]) -> Optional[str]:
"""
Determine status classification.
Args:
router_ok: Whether router check passed
failed_dns: List of failed DNS resolver names
all_dns: List of all DNS resolver names
Returns:
Status classification string or None if all OK
"""
if not router_ok:
return "routerDown"
if not all_dns:
return None # No DNS checks configured, all OK
num_failed = len(failed_dns)
num_total = len(all_dns)
if num_failed == num_total:
return "ispDown"
elif num_failed > 0:
return "upstreamDnsDown"
else:
return None # All OK
def should_fire_down_alert(fail_streak: int, must_fail_count: int, down_notified: bool,
last_alert_ts: int, backoff_seconds: int, current_time: int) -> bool:
"""
Check if DOWN alert should fire.
Args:
fail_streak: Current failure streak count
must_fail_count: Required failures before alerting
down_notified: Whether down alert was already sent
last_alert_ts: Timestamp of last alert
backoff_seconds: Backoff period in seconds
current_time: Current timestamp (to avoid multiple time.time() calls)
Returns:
True if DOWN alert should fire
"""
if fail_streak < must_fail_count:
return False
if down_notified:
return False
# Check backoff
time_since_last = current_time - last_alert_ts
if time_since_last < backoff_seconds:
return False
return True
def should_fire_up_alert(all_ok: bool, down_notified: bool) -> bool:
"""
Check if UP alert should fire.
Args:
all_ok: Whether all checks passed
down_notified: Whether down alert was previously sent
Returns:
True if UP alert should fire
"""
return all_ok and down_notified
def should_fire_partial_recovery_alert(last_status: Optional[str], current_status: Optional[str],
down_notified: bool, last_failed_dns: List[str],
current_failed_dns: List[str]) -> bool:
"""
Check if partial recovery alert should fire.
Handles scenarios:
1. routerDown → ispDown (router recovered, all DNS failed)
2. routerDown → upstreamDnsDown (router recovered, some DNS failed)
3. ispDown → upstreamDnsDown (all DNS failed → some DNS recovered)
4. upstreamDnsDown → upstreamDnsDown with fewer failures (some DNS recovered)
Recovery notifications bypass backoff period.
Args:
last_status: Previous status classification
current_status: Current status classification
down_notified: Whether down alert was previously sent
last_failed_dns: List of DNS resolver names that failed previously
current_failed_dns: List of DNS resolver names that are currently failing
Returns:
True if partial recovery alert should fire
"""
if not down_notified:
return False
# Scenario 1 & 2: routerDown → ispDown or upstreamDnsDown
if last_status == "routerDown":
if current_status == "ispDown" or current_status == "upstreamDnsDown":
return True
if current_status != "upstreamDnsDown":
return False
# Scenario 3: ispDown → upstreamDnsDown
if last_status == "ispDown":
return True
# Scenario 4: upstreamDnsDown → upstreamDnsDown with fewer failures
if last_status == "upstreamDnsDown":
# Check if fewer DNS are failing now than before
if len(current_failed_dns) < len(last_failed_dns):
return True
return False
def emit_alert(message: str) -> None:
"""
Output JSON alert to stdout.
Args:
message: Alert message (max length per MeshMonitor requirement)
"""
# Truncate to max length per MeshMonitor requirement
if len(message) > MAX_MESSAGE_LENGTH:
message = message[:MAX_MESSAGE_LENGTH - 3] + "..."
output = {"response": message}
print(json.dumps(output, ensure_ascii=False))
sys.stdout.flush()
def replace_placeholders(template: str, failed_names: List[str]) -> str:
"""
Replace placeholders in message template.
Args:
template: Message template with {{failed}} placeholder
failed_names: List of failed DNS resolver names
Returns:
Message with placeholders replaced
"""
if "{{failed}}" in template:
failed_str = ", ".join(failed_names)
return template.replace("{{failed}}", failed_str)
return template
def _format_alert_message(status: Optional[str], messages: Dict[str, str], failed_dns: List[str]) -> str:
"""
Format alert message based on status and failed DNS resolvers.
Args:
status: Current status classification
messages: Message templates dictionary
failed_dns: List of failed DNS resolver names
Returns:
Formatted alert message
"""
if status == "routerDown":
return messages.get("routerDown", "Router is down")
elif status == "ispDown":
return messages.get("ispDown", "All DNS resolvers failed - ISP may be down")
elif status == "upstreamDnsDown":
template = messages.get("upstreamDnsDown", "DNS resolvers failed: {{failed}}")
return replace_placeholders(template, failed_dns)
else:
return "Network issue detected"
def _build_dns_status_list(dns_checks: List[Dict[str, Any]], failed_dns: List[str]) -> List[str]:
"""Build list of 'Name OK/FAIL' strings for all DNS checks."""
dns_statuses = []
for i, check in enumerate(dns_checks):
name = _get_dns_display_name(check, i)
status = "FAIL" if name in failed_dns else "OK"
dns_statuses.append(f"{name} {status}")
return dns_statuses
def format_status_report(router_ok: bool, failed_dns: List[str], all_dns: List[str],
dns_checks: List[Dict[str, Any]]) -> str:
"""
Format status report for Auto Responder mode.
Returns current status of router and all DNS checks, optimized for
200-character MeshMonitor message limit.
Args:
router_ok: Whether router check passed
failed_dns: List of failed DNS resolver names
all_dns: List of all DNS resolver names
dns_checks: DNS check configurations (for name extraction)
Returns:
Formatted status message (max 200 chars with truncation)
Output formats:
- Router down: "Router DOWN"
- Router OK, no DNS: "Router OK"
- Router OK, all DNS fail: "Router OK, All DNS FAIL"
- Router OK, mixed: "Router OK, DNS: Google OK, Cloudflare FAIL, ..."
"""
# Router down - simple message
if not router_ok:
return "Router DOWN"
# Router OK, no DNS checks configured
if not all_dns:
return "Router OK"
# Router OK, all DNS failed
if len(failed_dns) == len(all_dns):
return "Router OK, All DNS FAIL"
# Router OK, mixed DNS status - build detailed report
# Format: "Router OK, DNS: Name1 OK, Name2 FAIL, Name3 OK"
dns_report = ", ".join(_build_dns_status_list(dns_checks, failed_dns))
message = f"Router OK, DNS: {dns_report}"
# Truncate if exceeds MAX_MESSAGE_LENGTH (200 chars)
if len(message) > MAX_MESSAGE_LENGTH:
# Try abbreviated format: "Router OK, 2 of 5 DNS FAIL"
fail_count = len(failed_dns)
total_count = len(all_dns)
message = f"Router OK, {fail_count} of {total_count} DNS FAIL"
# If still too long, use minimal format
if len(message) > MAX_MESSAGE_LENGTH:
message = message[:MAX_MESSAGE_LENGTH - 3] + "..."
return message
def format_router_report(router_ok: bool) -> str:
"""
Format router-only status report for Auto Responder mode.
Args:
router_ok: Whether router check passed
Returns:
Formatted router status message
"""
if router_ok:
return "Router OK"
return "Router DOWN"
def format_dns_report(router_ok: bool, failed_dns: List[str], all_dns: List[str],
dns_checks: List[Dict[str, Any]]) -> str:
"""
Format DNS-only status report for Auto Responder mode.
Router must be up to perform DNS checks. If router is down,
reports that DNS could not be checked.
Args:
router_ok: Whether router check passed
failed_dns: List of failed DNS resolver names
all_dns: List of all DNS resolver names
dns_checks: DNS check configurations (for name extraction)
Returns:
Formatted DNS status message (max 200 chars with truncation)
"""
if not router_ok:
return "DNS: Unknown (router down)"