-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathheartbeat.py
More file actions
executable file
·367 lines (303 loc) · 11.3 KB
/
heartbeat.py
File metadata and controls
executable file
·367 lines (303 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
#!/usr/bin/env python3
"""
Cortex Heartbeat — Proactive check-in daemon (OpenClaw-inspired).
Runs every 30 minutes via launchd. Aggregates system state and sends a
Slack message ONLY when action is needed. Silent otherwise (like
OpenClaw's HEARTBEAT_OK pattern).
State sources:
1. Service health (from alert_monitor's alerts.jsonl)
2. GOALS.md pending items
3. Batch job status (~/.cortex/batch/)
4. Supervisor last tick result
5. Test regression alerts
Usage:
python3 cortex/heartbeat.py # Run once
python3 cortex/heartbeat.py --force # Force send even if no action needed
python3 cortex/heartbeat.py --dry-run # Print message without sending
Slack channel configured via CORTEX_HEARTBEAT_CHANNEL env var or
~/.cortex/heartbeat_config.json.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
CORTEX_DIR = Path.home() / ".cortex"
ALERTS_LOG = CORTEX_DIR / "alerts.jsonl"
BATCH_DIR = CORTEX_DIR / "batch"
HEARTBEAT_STATE = CORTEX_DIR / "heartbeat_state.json"
HEARTBEAT_CONFIG = CORTEX_DIR / "heartbeat_config.json"
GOALS_PATH = Path(__file__).resolve().parent.parent / "GOALS.md"
SUPERVISOR_LOG = CORTEX_DIR / "orchestration" / "runs"
# Minimum interval between heartbeat messages (avoid spam)
MIN_INTERVAL_MINUTES = 25
def _now() -> datetime:
return datetime.now(timezone.utc)
def _load_config() -> dict:
"""Load heartbeat config (Slack channel, etc.)."""
defaults = {
"slack_channel": os.environ.get("CORTEX_HEARTBEAT_CHANNEL", ""),
"slack_token": os.environ.get("SLACK_BOT_TOKEN", ""),
"min_severity": "MEDIUM",
"silent_ok": True,
}
try:
if HEARTBEAT_CONFIG.exists():
cfg = json.loads(HEARTBEAT_CONFIG.read_text())
defaults.update(cfg)
except Exception:
pass
return defaults
def _should_run() -> bool:
"""Rate limit: don't send more often than MIN_INTERVAL_MINUTES."""
try:
if HEARTBEAT_STATE.exists():
state = json.loads(HEARTBEAT_STATE.read_text())
last_run = datetime.fromisoformat(state.get("last_run", "2000-01-01T00:00:00+00:00"))
if _now() - last_run < timedelta(minutes=MIN_INTERVAL_MINUTES):
return False
except Exception:
pass
return True
def _save_state(sent: bool, summary: str):
"""Record heartbeat execution state."""
try:
HEARTBEAT_STATE.parent.mkdir(parents=True, exist_ok=True)
HEARTBEAT_STATE.write_text(
json.dumps(
{
"last_run": _now().isoformat(),
"sent": sent,
"summary": summary[:200],
}
)
)
except Exception:
pass
# ── State Collectors ─────────────────────────────────────────────────
def _check_alerts() -> list[dict]:
"""Check for recent HIGH/CRITICAL alerts (last 30 min)."""
alerts = []
cutoff = _now() - timedelta(minutes=30)
try:
if not ALERTS_LOG.exists():
return []
for line in ALERTS_LOG.read_text().strip().splitlines()[-50:]:
entry = json.loads(line)
ts = datetime.fromisoformat(entry.get("ts", "2000-01-01"))
if ts > cutoff and entry.get("severity") in ("HIGH", "CRITICAL"):
alerts.append(entry)
except Exception:
pass
return alerts
def _check_goals() -> dict:
"""Count unchecked items in GOALS.md."""
result = {"total": 0, "high_priority": 0, "immediate": []}
try:
if not GOALS_PATH.exists():
return result
text = GOALS_PATH.read_text()
in_high = False
in_immediate = False
for line in text.splitlines():
lower = line.lower().strip()
if "high priority" in lower or "immediate" in lower:
in_high = True
in_immediate = "immediate" in lower
elif lower.startswith("##"):
in_high = False
in_immediate = False
if line.strip().startswith("- [ ]"):
result["total"] += 1
if in_high:
result["high_priority"] += 1
if in_immediate:
result["immediate"].append(line.strip()[6:].strip()[:80])
except Exception:
pass
return result
def _check_batches() -> dict:
"""Check for completed/failed batch jobs."""
result = {"pending": 0, "completed": 0, "failed": 0}
try:
if not BATCH_DIR.exists():
return result
for f in BATCH_DIR.glob("*.json"):
try:
data = json.loads(f.read_text())
status = data.get("status", "unknown")
if status == "completed":
result["completed"] += 1
elif status == "failed":
result["failed"] += 1
elif status in ("pending", "queued", "running"):
result["pending"] += 1
except Exception:
continue
except Exception:
pass
return result
def _check_services() -> list[str]:
"""Quick service health check (subset of alert_monitor)."""
down = []
services = [
("vortex-backend", "http://127.0.0.1:8000/api/v2/health"),
("cortex-bridge", "http://127.0.0.1:8765/health"),
]
for name, url in services:
try:
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=3)
except Exception:
down.append(name)
return down
def _check_supervisor() -> dict | None:
"""Read last supervisor run summary."""
try:
if not SUPERVISOR_LOG.exists():
return None
runs = sorted(SUPERVISOR_LOG.glob("*.json"))
if not runs:
return None
return json.loads(runs[-1].read_text())
except Exception:
return None
# ── Message Formatting ───────────────────────────────────────────────
def _build_message(
alerts: list[dict],
goals: dict,
batches: dict,
services_down: list[str],
supervisor: dict | None,
) -> str | None:
"""Build Slack message. Returns None if no action needed."""
sections = []
needs_action = False
# Service health
if services_down:
needs_action = True
sections.append(f":red_circle: *Services Down*: {', '.join(services_down)}")
# Recent alerts
if alerts:
needs_action = True
alert_lines = [
f" • [{a['severity']}] {a.get('service', '?')}: {a.get('message', '')[:60]}"
for a in alerts[:3]
]
sections.append(":warning: *Recent Alerts*\n" + "\n".join(alert_lines))
# Batch status
if batches["failed"] > 0:
needs_action = True
sections.append(
f":x: *Batch Jobs*: {batches['failed']} failed, {batches['completed']} completed, {batches['pending']} pending"
)
elif batches["completed"] > 0:
sections.append(
f":white_check_mark: *Batch Jobs*: {batches['completed']} completed, {batches['pending']} pending"
)
# Goals
if goals["immediate"]:
needs_action = True
items = "\n".join(f" • {i}" for i in goals["immediate"][:3])
sections.append(f":dart: *Immediate Actions* ({goals['total']} total)\n{items}")
# Supervisor
if supervisor:
errors = supervisor.get("errors", [])
if errors:
needs_action = True
sections.append(f":gear: *Supervisor*: {len(errors)} errors in last run")
if not sections:
return None
header = ":brain: *Cortex Heartbeat*" + (
" — :rotating_light: Action Needed" if needs_action else ""
)
ts = _now().strftime("%H:%M UTC")
footer = f"_Checked at {ts}_"
return header + "\n\n" + "\n\n".join(sections) + "\n\n" + footer
def _send_slack(message: str, config: dict) -> bool:
"""Send message via Slack API."""
token = config.get("slack_token", "")
channel = config.get("slack_channel", "")
if not token or not channel:
# Fall back to desktop notification
try:
subprocess.run(
[
"osascript",
"-e",
f'display notification "{message[:200]}" with title "Cortex Heartbeat"',
],
timeout=5,
capture_output=True,
)
except Exception:
pass
return False
try:
payload = json.dumps(
{
"channel": channel,
"text": message,
"mrkdwn": True,
}
).encode()
req = urllib.request.Request(
"https://slack.com/api/chat.postMessage",
data=payload,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=10)
result = json.loads(resp.read())
return result.get("ok", False)
except Exception:
return False
# ── Main ─────────────────────────────────────────────────────────────
def main():
force = "--force" in sys.argv
dry_run = "--dry-run" in sys.argv
# Session rotation runs independently of heartbeat rate limiting
_check_session_rotation()
config = _load_config()
if not force and not _should_run():
sys.exit(0)
# Collect state
alerts = _check_alerts()
goals = _check_goals()
batches = _check_batches()
services_down = _check_services()
supervisor = _check_supervisor()
# Build message
message = _build_message(alerts, goals, batches, services_down, supervisor)
if message is None and config.get("silent_ok", True) and not force:
_save_state(sent=False, summary="HEARTBEAT_OK")
sys.exit(0)
if message is None:
message = ":brain: *Cortex Heartbeat* — All clear :white_check_mark:\n_No action needed._"
if dry_run:
print(message)
_save_state(sent=False, summary="dry-run")
sys.exit(0)
sent = _send_slack(message, config)
_save_state(sent=sent, summary=message[:200])
if not sent and message:
# Fallback: print to stdout for launchd log capture
print(message, file=sys.stderr)
def _check_session_rotation():
"""Check if the active claude session needs rotation (age or request threshold)."""
try:
sys.path.insert(0, str(Path(__file__).resolve().parent))
from session_watcher import check_and_rotate_if_needed
result = check_and_rotate_if_needed()
if result not in ("no_claude_process",) and not result.startswith("ok"):
print(f"[heartbeat] session_watcher: {result}", file=sys.stderr)
except Exception as e:
print(f"[heartbeat] session_watcher error: {e}", file=sys.stderr)
if __name__ == "__main__":
main()