-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherror_registry.py
More file actions
207 lines (166 loc) · 6.5 KB
/
error_registry.py
File metadata and controls
207 lines (166 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python3
"""
Error Registry - Detect silent agent failures.
Scans log files for repeating errors, stuck loops, and rising error rates.
Catches the problems your agent won't tell you about.
CLI usage:
python3 error_registry.py --scan agent.log
python3 error_registry.py --feed # reads stdin
python3 error_registry.py --summary # show current error state
python3 error_registry.py --clear # reset error state
Library usage:
from error_registry import check_errors, get_summary
"""
import json
import re
import sys
from collections import Counter
from datetime import datetime, timedelta
from pathlib import Path
from utils import load_config, load_state, save_state
def _normalize(line):
"""Normalize a log line for fingerprinting.
Strips timestamps, UUIDs, hex IDs, and numeric sequences so that
repeated errors with different metadata match the same fingerprint.
"""
line = line.strip().lower()
# Strip ISO timestamps
line = re.sub(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*[z]?', '', line)
# Strip UUIDs
line = re.sub(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', '<id>', line)
# Strip hex IDs (8+ chars)
line = re.sub(r'[0-9a-f]{8,}', '<hex>', line)
# Strip bare numbers
line = re.sub(r'\b\d+\b', '<n>', line)
return line[:120]
def _is_error_line(line, patterns):
"""Check if a line matches any of the configured error patterns."""
lower = line.lower()
return any(p.lower() in lower for p in patterns)
def check_errors(lines, config=None):
"""Scan log lines and return error analysis.
Args:
lines: iterable of log lines (strings)
config: optional config dict (loaded from config.json if None)
Returns:
dict with keys: repeating, loops, total_errors, alerts
"""
if config is None:
config = load_config()
er_config = config.get("error_registry", {})
repeat_threshold = er_config.get("repeat_threshold", 3)
loop_threshold = er_config.get("loop_threshold", 5)
patterns = er_config.get("log_patterns", ["ERROR", "FAIL", "Exception", "Traceback"])
fingerprints = Counter()
error_count = 0
for line in lines:
line = line.rstrip('\n')
if _is_error_line(line, patterns):
error_count += 1
fp = _normalize(line)
fingerprints[fp] += 1
# Repeating errors: same fingerprint appears N+ times
repeating = {fp: count for fp, count in fingerprints.items()
if count >= repeat_threshold}
# Loop detection: any single fingerprint appears loop_threshold+ times
loops = {fp: count for fp, count in fingerprints.items()
if count >= loop_threshold}
# Build alerts
alerts = []
if repeating:
alerts.append(f"{len(repeating)} repeating error(s) detected")
if loops:
alerts.append(f"{len(loops)} possible stuck loop(s)")
if error_count > 50:
alerts.append(f"High error volume: {error_count} errors")
return {
"repeating": repeating,
"loops": loops,
"total_errors": error_count,
"alerts": alerts,
"scanned_at": datetime.now().isoformat(),
}
def get_summary():
"""Get the most recent error registry summary from state."""
state = load_state()
return state.get("error_registry", {})
def _save_results(results):
"""Persist scan results to state.json."""
state = load_state()
state["error_registry"] = {
"last_scan": results["scanned_at"],
"total_errors": results["total_errors"],
"repeating_count": len(results["repeating"]),
"loop_count": len(results["loops"]),
"alerts": results["alerts"],
"top_errors": dict(Counter(results["repeating"]).most_common(5)),
}
save_state(state)
def _print_results(results):
"""Print human-readable scan results."""
if not results["total_errors"]:
print("Clean. No errors found.")
return
print(f"Errors found: {results['total_errors']}")
if results["repeating"]:
print(f"\nRepeating ({len(results['repeating'])}):")
for fp, count in sorted(results["repeating"].items(), key=lambda x: -x[1]):
print(f" {count}x {fp[:80]}")
if results["loops"]:
print(f"\nPossible stuck loops ({len(results['loops'])}):")
for fp, count in sorted(results["loops"].items(), key=lambda x: -x[1]):
print(f" {count}x {fp[:80]}")
if results["alerts"]:
print(f"\nAlerts:")
for a in results["alerts"]:
print(f" ! {a}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Agent error registry")
parser.add_argument("--scan", metavar="FILE", help="Scan a log file for errors")
parser.add_argument("--feed", action="store_true", help="Read from stdin")
parser.add_argument("--summary", action="store_true", help="Show last scan summary")
parser.add_argument("--clear", action="store_true", help="Clear error state")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
if args.summary:
summary = get_summary()
if args.json:
print(json.dumps(summary, indent=2))
elif summary:
print(f"Last scan: {summary.get('last_scan', 'never')}")
print(f"Errors: {summary.get('total_errors', 0)}")
print(f"Repeating: {summary.get('repeating_count', 0)}")
print(f"Loops: {summary.get('loop_count', 0)}")
for a in summary.get("alerts", []):
print(f" ! {a}")
else:
print("No scan data yet. Run --scan or --feed first.")
return
if args.clear:
state = load_state()
state.pop("error_registry", None)
save_state(state)
print("Error registry cleared.")
return
if args.scan:
try:
lines = Path(args.scan).read_text().splitlines()
except FileNotFoundError:
print(f"File not found: {args.scan}", file=sys.stderr)
sys.exit(1)
elif args.feed:
lines = sys.stdin.readlines()
else:
parser.print_help()
sys.exit(2)
results = check_errors(lines)
_save_results(results)
if args.json:
out = {**results, "repeating": dict(results["repeating"]), "loops": dict(results["loops"])}
print(json.dumps(out, indent=2))
else:
_print_results(results)
sys.exit(1 if results["alerts"] else 0)
if __name__ == "__main__":
main()