-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot_base.py
More file actions
1792 lines (1568 loc) · 79.5 KB
/
bot_base.py
File metadata and controls
1792 lines (1568 loc) · 79.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import re
# Copyright (c) 2026 Nardo. AGPL-3.0 — see LICENSE
"""
Multi-persona Telegram bot base.
Each persona is defined in personas/<id>.json and run via: python run_bot.py <id>
"""
import asyncio
import os
import json
import logging
import tempfile
import time as _time_mod
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
logger = logging.getLogger(__name__)
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.error import TelegramError
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from news import generate_full_digest
from digest_ui import handle_digest_callback
from crypto_news import generate_crypto_digest
from stablecoin_yields import generate_yield_report
from twitter_feed import generate_twitter_digest
from x_feed import generate_xlist_digest
from x_curator import generate_daily_digest, make_key
from reddit_digest import generate_reddit_digest
from x_feedback import record_vote, get_vote
from memory import MemoryManager, format_memory_block, RECENT_WINDOW
from conversation_compressor import ConversationCompressor
from utils import CLAUDE_BIN, PROJECT_DIR
from conversation_logger import log_message
from sanitizer import sanitize_external_content
from llm_client import chat_completion_async, get_primary_client
# MiniMax removed — all LLM calls route through llm_client.py (Kimi → fallback chain)
PERSONAS_DIR = Path(__file__).parent / "personas"
TOPIC_CACHE = Path(__file__).parent / "topic_cache.json"
CLAUDE_SESSIONS_FILE = Path(__file__).parent / "claude_sessions.json"
CLAUDE_COST_LOG = Path(__file__).parent / "claude_cost_log.jsonl"
MINIMAX_COST_LOG = Path(__file__).parent / "minimax_cost_log.jsonl"
# When True, digest scheduling is handled by OS cron (send_digest.py, send_xdigest.py, etc.)
# Bot internal schedulers are disabled to prevent double-sends.
# Manual commands (/digest, /xdigest, /xcurate, /reddit) still work.
CRON_MANAGED = True
MAX_HISTORY = 40
INPUT_MAX_LENGTH = 10000
DEAD_LETTERS_FILE = Path(__file__).parent / "dead_letters.json"
# ── Security: allowed users & rate limiting ──────────────────────────────
_admin_id = int(os.environ.get("ADMIN_USER_ID", "0"))
_allowed_users_raw = os.environ.get("ALLOWED_USERS", "")
ALLOWED_USERS: set[int] = set()
if _allowed_users_raw:
for _uid in _allowed_users_raw.split(","):
_uid = _uid.strip()
if _uid.isdigit():
ALLOWED_USERS.add(int(_uid))
if _admin_id:
ALLOWED_USERS.add(_admin_id)
# Per-user rate limiting: 10 messages per 60 seconds
_RATE_LIMIT_WINDOW = 60
_RATE_LIMIT_MAX = 10
_rate_limit_data: dict[int, list[float]] = defaultdict(list)
_rate_limit_notified: dict[int, float] = {} # user_id → last notification time
def _is_allowed_user(user_id: int) -> bool:
"""Check if user is in the whitelist."""
if not ALLOWED_USERS:
return True # no whitelist configured → allow all
return user_id in ALLOWED_USERS
def _check_rate_limit(user_id: int) -> bool:
"""Return True if user is within rate limit, False if exceeded."""
now = _time_mod.time()
timestamps = _rate_limit_data[user_id]
# Prune old timestamps
_rate_limit_data[user_id] = [t for t in timestamps if now - t < _RATE_LIMIT_WINDOW]
if len(_rate_limit_data[user_id]) >= _RATE_LIMIT_MAX:
return False
_rate_limit_data[user_id].append(now)
return True
def _save_dead_letter(chat_id: int, text: str, error: str) -> None:
"""Append a failed message to dead_letters.json (capped at 100 entries)."""
import stat
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"chat_id": chat_id,
"text": text[:200],
"error": error,
}
letters = []
import fcntl
try:
with open(DEAD_LETTERS_FILE, "a+") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.seek(0)
try:
letters = json.load(f)
except (json.JSONDecodeError, ValueError):
letters = []
letters.append(entry)
if len(letters) > 100:
letters = letters[-100:]
f.seek(0)
f.truncate()
json.dump(letters, f, indent=2, ensure_ascii=False)
except OSError as e:
logging.warning("dead_letter write failed: %s", e)
try:
os.chmod(DEAD_LETTERS_FILE, stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
def _get_today_cost() -> float:
"""Sum today's Claude Code costs from the JSONL log (HKT day boundary)."""
if not CLAUDE_COST_LOG.exists():
return 0.0
from datetime import timedelta
HKT = timezone(timedelta(hours=8))
today_str = datetime.now(HKT).strftime("%Y-%m-%d")
total = 0.0
try:
with open(CLAUDE_COST_LOG) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if entry.get("ts", "").startswith(today_str):
total += entry.get("cost_usd", 0)
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning("Failed to read cost log: %s", e)
return total
DAILY_COST_CAP = float(os.environ.get("DAILY_COST_CAP", "50.0"))
# ── Claude Code session persistence (shared across persona bots) ─────────
def _load_bot_sessions() -> dict:
if CLAUDE_SESSIONS_FILE.exists():
try:
return json.loads(CLAUDE_SESSIONS_FILE.read_text())
except Exception as e:
logging.warning("Failed to load sessions from %s, starting fresh: %s", CLAUDE_SESSIONS_FILE, e)
return {}
def _save_bot_sessions(sessions: dict) -> None:
import tempfile
tmp_fd, tmp_path = tempfile.mkstemp(
dir=CLAUDE_SESSIONS_FILE.parent, suffix=".tmp"
)
try:
with os.fdopen(tmp_fd, "w") as f:
json.dump(sessions, f, indent=2)
os.replace(tmp_path, CLAUDE_SESSIONS_FILE)
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _log_claude_cost(persona_id: str, session_key: str, cost_usd: float, duration_ms: int) -> None:
"""Append Claude SDK cost entry to JSONL log."""
entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"persona": persona_id,
"session": session_key,
"cost_usd": cost_usd,
"duration_ms": duration_ms,
}
with open(CLAUDE_COST_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
def _log_minimax_cost(persona_id: str, model: str, input_tokens: int, output_tokens: int) -> None:
"""Append MiniMax/Groq usage entry to JSONL log."""
# Pricing per 1M tokens (USD)
_pricing = {
"MiniMax-M2.5": {"input": 0.50, "output": 1.50},
"MiniMax-M2.5-highspeed": {"input": 0.50, "output": 1.50},
"llama-3.3-70b-versatile": {"input": 0.00, "output": 0.00}, # Groq free tier
}
p = _pricing.get(model, {"input": 0.50, "output": 1.50})
cost_usd = (input_tokens * p["input"] + output_tokens * p["output"]) / 1_000_000
entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"persona": persona_id,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost_usd,
}
try:
with open(MINIMAX_COST_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception:
pass
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
# ── Topic cache (shared JSON file across all personas) ───────────────────────
_retry_logger = logging.getLogger("send_retry")
async def _send_with_retry(bot, chat_id, thread_id, retries=3, **kwargs) -> bool:
for attempt in range(1, retries + 1):
try:
await bot.send_message(
chat_id=chat_id, message_thread_id=thread_id,
read_timeout=30, write_timeout=30, **kwargs,
)
return True
except TelegramError as e:
if attempt < retries:
wait = attempt * 3
_retry_logger.warning("Retry %d/%d in %ds (chat=%s): %s", attempt, retries, wait, chat_id, e)
await asyncio.sleep(wait)
else:
_retry_logger.error("Give up after %d attempts (chat=%s): %s", retries, chat_id, e)
_save_dead_letter(chat_id, kwargs.get("text", "")[:200], str(e))
return False
def _load_cache() -> dict:
if TOPIC_CACHE.exists():
try:
return json.loads(TOPIC_CACHE.read_text())
except Exception as e:
logging.warning("Failed to load cache from %s, starting fresh: %s", TOPIC_CACHE, e)
return {}
def _save_cache(cache: dict) -> None:
tmp = TOPIC_CACHE.with_suffix(".tmp")
tmp.write_text(json.dumps(cache, indent=2, ensure_ascii=False))
tmp.replace(TOPIC_CACHE)
# ── Main runner ───────────────────────────────────────────────────────────────
def run_persona(persona_id: str) -> None:
config_path = PERSONAS_DIR / f"{persona_id}.json"
if not config_path.exists():
raise FileNotFoundError(f"Persona config not found: {config_path}")
persona = json.loads(config_path.read_text())
logger = logging.getLogger(persona_id)
# Token: try TELEGRAM_BOT_TOKEN_<ID> first, fall back to TELEGRAM_BOT_TOKEN
token_env = f"TELEGRAM_BOT_TOKEN_{persona_id.upper()}"
token = os.environ.get(token_env) or os.environ.get("TELEGRAM_BOT_TOKEN")
if not token:
raise ValueError(f"Set {token_env} (or TELEGRAM_BOT_TOKEN) in .env")
display_name = persona["display_name"]
topic_names = {n.lower() for n in persona.get("topic_names", [])}
system_prompt = persona["system_prompt"]
voice_enabled = persona.get("voice_enabled", False)
whisper_lang = persona.get("whisper_language")
news_module = persona.get("news_module", "standard")
yields_enabled = persona.get("yields_enabled", False)
digest_enabled = persona.get("digest_enabled", True)
twitter_enabled = persona.get("twitter_enabled", False)
twitter_accounts = persona.get("twitter_accounts", [])
xcurate_target = persona.get("xcurate_target") # {"chat_id": ..., "thread_id": ...}
xcurate_lang = persona.get("xcurate_lang") # "zh" for Chinese-only
xcurate_lists = persona.get("xcurate_lists", []) # list IDs for list-based fetching
reddit_enabled = persona.get("reddit_enabled", False)
reddit_subs = persona.get("reddit_subreddits", [])
reddit_target = persona.get("reddit_target") # {"chat_id": ..., "thread_id": ...}
_responses = persona.get("responses", {})
def _r(key: str, **kw) -> str:
"""Return a persona-specific response string, with optional format vars."""
tmpl = _responses.get(key, f"[{key}]")
return tmpl.format(display_name=display_name, **kw)
db_path = Path(__file__).parent / f"memory_{persona_id}.db"
subs_file = Path(__file__).parent / f"subscribers_{persona_id}.json"
import time as _time
_start_time = _time.time()
_llm_client, _llm_model = get_primary_client()
logger.info("Primary LLM client: model=%s", _llm_model)
if _llm_client is None:
raise RuntimeError("No LLM provider available — check API keys in .env")
mem = MemoryManager(_llm_client, db_path=db_path, model_name=_llm_model)
compressor = ConversationCompressor(_llm_client, model_name=_llm_model)
convs: dict[tuple, list[dict]] = defaultdict(list)
# In-memory topic cache (thread_id str → name str), keyed by group chat_id str
topic_cache: dict[str, dict[str, str]] = _load_cache()
# ── Whisper (optional) ────────────────────────────────────────────────────
whisper_model = None
if voice_enabled:
from faster_whisper import WhisperModel
logger.info("Loading Whisper for %s…", display_name)
whisper_model = WhisperModel("large-v3", device="cpu", compute_type="int8")
logger.info("Whisper ready.")
# ── Helpers ───────────────────────────────────────────────────────────────
def _thread_name(chat_id: int, thread_id: int | None) -> str | None:
g = topic_cache.get(str(chat_id), {})
return g.get(str(thread_id or 0))
def _cache_thread(chat_id: int, thread_id: int | None, name: str) -> None:
g = str(chat_id)
tid = str(thread_id or 0)
topic_cache.setdefault(g, {})[tid] = name
_save_cache(topic_cache)
logger.info("Cached topic: chat=%s thread=%s → '%s'", g, tid, name)
def _is_my_thread(chat_type: str, chat_id: int, thread_id: int | None) -> bool:
if chat_type == "private":
return True
if not topic_names:
return False
name = _thread_name(chat_id, thread_id)
return name is not None and name.lower() in topic_names
def _check_private_user(update: Update) -> bool:
"""Return True if this private-chat user is allowed. Groups always pass."""
chat_type = update.effective_chat.type
if chat_type != "private":
return True
user_id = update.effective_user.id if update.effective_user else 0
return _is_allowed_user(user_id)
def _mem_key(chat_id: int, thread_id: int | None) -> str:
return f"{chat_id}:{thread_id or 0}"
def _conv_key(chat_id: int, thread_id: int | None) -> tuple:
return (chat_id, thread_id or 0)
# ── Subscriber helpers ────────────────────────────────────────────────────
def _load_subs() -> set[int]:
if subs_file.exists():
try:
return set(json.loads(subs_file.read_text()))
except Exception:
pass
return set()
def _save_subs(subs: set[int]) -> None:
subs_file.write_text(json.dumps(sorted(subs)))
def _sub_targets() -> list[tuple[int, int | None]]:
"""Return (chat_id, thread_id) pairs: private subs + registered topic threads."""
targets: list[tuple[int, int | None]] = []
for chat_id in _load_subs():
targets.append((int(chat_id), None))
for group_id_str, threads in topic_cache.items():
for thread_id_str, name in threads.items():
if name and name.lower() in topic_names:
targets.append((int(group_id_str), int(thread_id_str)))
return targets
async def _notify_fail(bot, job_name: str, error: Exception) -> None:
"""Send a failure notice to all subscribers."""
from html import escape as _hesc
msg = f"⚠️ <b>{_hesc(job_name)}</b> failed\n<code>{_hesc(str(error))}</code>"
for chat_id, thread_id in _sub_targets():
try:
await bot.send_message(
chat_id=chat_id,
message_thread_id=thread_id,
text=msg,
parse_mode="HTML",
)
except Exception:
pass
# ── Core respond logic ────────────────────────────────────────────────────
async def _respond(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
mk = _mem_key(chat_id, thread_id)
ck = _conv_key(chat_id, thread_id)
user_id = update.effective_user.id if update.effective_user else 0
log_message(persona_id, user_id, "user", text, "text")
try:
await update.effective_chat.send_action("typing")
# Truncate old messages BEFORE appending the new user turn
conv = convs[ck]
if len(conv) >= MAX_HISTORY:
truncated = conv[:-MAX_HISTORY]
compressor.absorb_truncated(ck, truncated)
convs[ck] = conv[-MAX_HISTORY:]
# Append user message to the (possibly rebound) active list
convs[ck].append({"role": "user", "content": text})
past = mem.retrieve(mk, text)
memory_block = format_memory_block(past)
system = system_prompt + ("\n\n" + memory_block if memory_block else "")
if memory_block:
logger.info("Injected %d memories for %s", len(past), mk)
# Compress older messages into rolling summary
await compressor.maybe_compress_async(ck, convs[ck], RECENT_WINDOW)
summary_block = compressor.get_summary_block(ck)
if summary_block:
system = system + "\n\n" + summary_block
recent = convs[ck][-RECENT_WINDOW:]
messages = [{"role": "system", "content": system}, *recent]
reply = await chat_completion_async(
messages=messages,
max_tokens=4096,
)
convs[ck].append({"role": "assistant", "content": reply})
mem.store(mk, "user", text)
mem.store(mk, "assistant", reply)
log_message(persona_id, user_id, "assistant", reply, "text", model="llm_client")
if len(reply) <= 4096:
await update.message.reply_text(reply)
else:
for i in range(0, len(reply), 4096):
await update.message.reply_text(reply[i : i + 4096])
except Exception as e:
logger.error("Respond error: %s", e)
# Remove the user message we just appended — no ghost entries
if convs[ck] and convs[ck][-1].get("role") == "user":
convs[ck].pop()
await update.message.reply_text("⚠️ Something went wrong. Check logs.")
# ── Claude Code fallback (for tasks needing tools) ─────────────────────
# #1: Per-topic task queue — only 1 Claude Code task at a time per topic
_claude_locks: dict[tuple, asyncio.Lock] = defaultdict(asyncio.Lock)
# Track which topics are in "Claude mode" for follow-up routing (#6)
_claude_active: dict[tuple, float] = {} # conv_key → last claude timestamp
CLAUDE_MODE_TIMEOUT = 300 # 5 min of inactivity → back to MiniMax
# ── Photo accumulator (intent-based) + album dedup ────────────────────────
_photo_queue: dict[tuple, list[dict]] = defaultdict(list) # ck → [{file_id, caption}]
_media_group_seen: dict[str, float] = {} # media_group_id → timestamp
# ── Text chunk merging (0.6s debounce) ────────────────────────────────────
_text_buffer: dict[tuple, list[str]] = defaultdict(list)
_text_buffer_tasks: dict[tuple, asyncio.Task] = {}
_album_notify_tasks: dict[str, asyncio.Task] = {} # album debounce (separate from text)
_text_buffer_updates: dict[tuple, Update] = {} # keep latest update for flush
_text_buffer_contexts: dict[tuple, ContextTypes.DEFAULT_TYPE] = {}
TEXT_MERGE_DELAY = 0.6
def _in_claude_mode(ck: tuple) -> bool:
"""Check if this topic is in active Claude Code conversation."""
last = _claude_active.get(ck)
if last is None:
return False
import time as _t
if _t.time() - last > CLAUDE_MODE_TIMEOUT:
_claude_active.pop(ck, None)
return False
return True
# #2: Smart task detection — ask MiniMax if unsure
async def _needs_claude_smart(text: str) -> bool:
"""Use MiniMax to classify if a message needs tools."""
lower = text.lower().strip()
# Obvious tasks — skip the classifier
obvious = ("research", "find out", "search for", "look up", "analyze",
"investigate", "scrape", "crawl", "find me", "dig into")
if any(lower.startswith(t) for t in obvious):
return True
# Obvious chat — skip the classifier
if len(text) < 10 or lower in ("hi", "hello", "hey", "ok", "thanks", "bye", "lol"):
return False
# Ask MiniMax (cheap, fast)
try:
resp = await asyncio.to_thread(
_llm_client.chat.completions.create,
model=_llm_model,
max_tokens=5,
messages=[
{"role": "system", "content":
"Classify if this message requires web search, real-time data, file access, "
"or code execution to answer properly. Reply ONLY 'tools' or 'chat'."},
{"role": "user", "content": text},
],
)
answer = resp.choices[0].message.content.strip().lower()
# Strip <think> blocks from MiniMax
import re
answer = re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip().lower()
return "tool" in answer
except Exception:
return False # default to MiniMax on classifier failure
async def _respond_claude(update: Update, context: ContextTypes.DEFAULT_TYPE, text: str) -> None:
"""Run a task via Claude Code CLI with persistent session per topic."""
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
ck = _conv_key(chat_id, thread_id)
# Security: daily cost cap
today_cost = _get_today_cost()
if today_cost >= DAILY_COST_CAP:
logger.warning("Daily cost cap reached: $%.2f >= $%.2f", today_cost, DAILY_COST_CAP)
await update.message.reply_text("Daily API cost limit reached.")
return
# #1: Queue — reject if already busy on this topic
lock = _claude_locks[ck]
if lock.locked():
await update.message.reply_text(f"⏳ {display_name} is already working on a task. Please wait.")
return
async with lock:
# #6: Mark topic as in Claude mode
import time as _t
_claude_active[ck] = _t.time()
# #3: Session persistence + expiry recovery
session_key = f"{persona_id}:{chat_id}:{thread_id or 0}"
sessions = _load_bot_sessions()
session_id = sessions.get(session_key)
status_msg = await update.message.reply_text(f"🔍 {display_name} is working on it...")
await update.effective_chat.send_action("typing")
result = ""
cost_usd = 0.0
duration_ms = 0
for attempt in range(2): # retry once on session expiry
try:
cc_args = [CLAUDE_BIN, "-p",
"--model", "claude-opus-4-6",
"--allowedTools", "Bash,Read,Glob,Grep,WebFetch,WebSearch",
"--output-format", "json"]
if session_id:
cc_args += ["--resume", session_id]
else:
cc_args += ["--system-prompt", system_prompt]
cc_env = os.environ.copy()
for _k in ("CLAUDECODE", "CLAUDE_CODE_ENTRYPOINT"):
cc_env.pop(_k, None)
proc = await asyncio.create_subprocess_exec(
*cc_args,
cwd=PROJECT_DIR,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=cc_env,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(input=text.encode()), timeout=180)
raw = stdout.decode().strip()
if not raw:
err = stderr.decode().strip()
# #3: Session expired — retry without resume
if session_id and ("session" in err.lower() or "resume" in err.lower()):
logger.warning("Claude session expired for %s, starting fresh", session_key)
sessions.pop(session_key, None)
_save_bot_sessions(sessions)
session_id = None
continue
result = "⚠️ No output. Check logs."
break
data = json.loads(raw)
result = data.get("result", "(no result)")
# Save session for follow-ups
sid = data.get("session_id")
if sid:
sessions[session_key] = sid
_save_bot_sessions(sessions)
# #4: Cost tracking
cost_usd = data.get("cost_usd", 0)
duration_ms = data.get("duration_ms", 0)
if cost_usd:
_log_claude_cost(persona_id, session_key, cost_usd, duration_ms)
logger.info("Claude cost: $%.4f (%dms) for %s", cost_usd, duration_ms, session_key)
break
except asyncio.TimeoutError:
result = "⚠️ Task timed out (180s)"
break
except json.JSONDecodeError as e:
# #3: Bad JSON might mean session error — retry fresh
if attempt == 0 and session_id:
logger.warning("Claude JSON error for %s, retrying fresh: %s", session_key, e)
sessions.pop(session_key, None)
_save_bot_sessions(sessions)
session_id = None
continue
result = f"⚠️ Failed to parse response"
break
except Exception as e:
logger.error("Claude Code error: %s", e)
result = "⚠️ Something went wrong. Check logs."
break
# Delete status message, send result
try:
await status_msg.delete()
except Exception:
pass
# Store in conversation + memory (skip error placeholders)
mk = _mem_key(chat_id, thread_id)
is_error = result.startswith("⚠️") or result.startswith("❌")
convs[ck].append({"role": "user", "content": text})
if not is_error:
convs[ck].append({"role": "assistant", "content": result})
mem.store(mk, "user", text)
mem.store(mk, "assistant", result)
_user_id = update.effective_user.id if update.effective_user else 0
log_message(persona_id, _user_id, "user", text, "text")
log_message(persona_id, _user_id, "assistant", result, "text", model="claude-opus")
if len(result) <= 4096:
await update.message.reply_text(result)
else:
for i in range(0, len(result), 4096):
await update.message.reply_text(result[i : i + 4096])
# ── Persona /menu command (layered inline keyboard) ─────────────────────
# Category definitions: (key, emoji_label, [(cmd_name, description), ...])
# Content commands are added dynamically based on persona config
_PMENU_CATEGORIES = [
("chat", "💬 Chat", [("clear", "Clear conversation"), ("status", "Bot status")]),
("content", "📰 Content", []), # populated dynamically
("subscribe", "🔔 Subscribe", [("subscribe", "Subscribe digests"), ("unsubscribe", "Unsubscribe")]),
("info", "ℹ️ Info", [("start", "About this bot")]),
]
def _build_persona_content_cmds() -> list[tuple[str, str]]:
"""Build content command list based on persona config flags."""
cmds = []
if digest_enabled and news_module != "none":
cmds.append(("news", "News digest"))
if twitter_enabled:
cmds.append(("xcurate", "X curation"))
cmds.append(("tweets", "Twitter digest"))
if reddit_enabled and reddit_subs:
cmds.append(("reddit", "Reddit digest"))
if yields_enabled:
cmds.append(("yields", "Yield report"))
return cmds
def _pmenu_top_keyboard() -> InlineKeyboardMarkup:
"""Build top-level persona menu keyboard."""
buttons = []
row = []
content_cmds = _build_persona_content_cmds()
for key, label, _cmds in _PMENU_CATEGORIES:
# Skip content category if no content commands available
if key == "content" and not content_cmds:
continue
row.append(InlineKeyboardButton(label, callback_data=f"pmenu:{key}"))
if len(row) == 2:
buttons.append(row)
row = []
if row:
buttons.append(row)
return InlineKeyboardMarkup(buttons)
def _pmenu_sub_keyboard(category_key: str) -> InlineKeyboardMarkup:
"""Build sub-command keyboard for a persona menu category."""
cmds = []
for key, _label, cmd_list in _PMENU_CATEGORIES:
if key == category_key:
cmds = list(cmd_list)
break
# Dynamically populate content commands
if category_key == "content":
cmds = _build_persona_content_cmds()
buttons = []
row = []
for cmd, desc in cmds:
row.append(InlineKeyboardButton(
f"/{cmd} — {desc}",
callback_data=f"pmenu:cmd:{cmd}",
))
if len(row) == 2:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([InlineKeyboardButton("← Back", callback_data="pmenu:back")])
return InlineKeyboardMarkup(buttons)
def _pmenu_category_label(key: str) -> str:
for k, label, _cmds in _PMENU_CATEGORIES:
if k == key:
return label
return key
async def cmd_menu(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show layered inline command menu for persona bot."""
chat_type = update.effective_chat.type
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, update.effective_chat.id, thread_id):
return
if not _check_private_user(update):
return
kb = _pmenu_top_keyboard()
await update.message.reply_text(
f"<b>📋 {display_name} — Menu</b>\nTap a category:",
parse_mode="HTML",
reply_markup=kb,
)
# Late-bound dispatch map: populated after all command functions are defined
_cmd_dispatch: dict = {}
async def handle_menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle persona menu inline keyboard callbacks."""
query = update.callback_query
if not query or not query.data:
return
# Check user is allowed
user_id = query.from_user.id if query.from_user else 0
if not _is_allowed_user(user_id):
await query.answer("Not authorized")
return
data = query.data # pmenu:category | pmenu:back | pmenu:cmd:name
if data == "pmenu:back":
await query.answer()
await query.edit_message_text(
f"<b>📋 {display_name} — Menu</b>\nTap a category:",
parse_mode="HTML",
reply_markup=_pmenu_top_keyboard(),
)
return
if data.startswith("pmenu:cmd:"):
cmd = data.split(":", 2)[2]
await query.answer(f"/{cmd}")
await query.edit_message_text(f"▶️ /{cmd}")
handler_fn = _cmd_dispatch.get(cmd)
if handler_fn:
await handler_fn(update, context)
return
# Category selected — show sub-commands
category_key = data.split(":", 1)[1]
label = _pmenu_category_label(category_key)
await query.answer()
await query.edit_message_text(
f"<b>📋 {label}</b>\nTap a command:",
parse_mode="HTML",
reply_markup=_pmenu_sub_keyboard(category_key),
)
# ── Topic event handlers ──────────────────────────────────────────────────
async def handle_topic_created(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
msg = update.message
if not msg or not msg.forum_topic_created:
return
_cache_thread(update.effective_chat.id, msg.message_thread_id, msg.forum_topic_created.name)
async def handle_topic_edited(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
msg = update.message
if not msg or not msg.forum_topic_edited:
return
if msg.forum_topic_edited.name:
_cache_thread(update.effective_chat.id, msg.message_thread_id, msg.forum_topic_edited.name)
# ── Command handlers ──────────────────────────────────────────────────────
async def register(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""/register — run this inside a forum topic thread to register it for this bot."""
if not update.message:
return
thread_id = update.message.message_thread_id
if not thread_id:
await update.message.reply_text(_r("register_no_thread"))
return
# Use provided name or fall back to bot's first configured topic name
name = " ".join(context.args) if context.args else (next(iter(topic_names), "") if topic_names else "")
if not name:
await update.message.reply_text("/register <topic name>")
return
_cache_thread(update.effective_chat.id, thread_id, name)
await update.message.reply_text(_r("register_ok", name=name))
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, update.effective_chat.id, thread_id):
return
await update.message.reply_text(_r("start"))
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
msg = update.effective_message
thread_id = msg.message_thread_id if msg else None
if chat_type != "private" and not _is_my_thread(chat_type, update.effective_chat.id, thread_id):
return
lines = [f"<b>🤖 {display_name} — Status</b>"]
# Uptime
import time as _time
uptime_secs = int(_time.time() - _start_time)
h, m = divmod(uptime_secs // 60, 60)
lines.append(f"⏱ Uptime: {h}h {m}m")
# Topics
lines.append(f"📌 Topics: {', '.join(topic_names) if topic_names else 'none'}")
# Memory DB
try:
n_mem = mem._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
lines.append(f"🧠 Messages stored: {n_mem}")
except Exception as e:
lines.append(f"🧠 Memory DB: ❌ {e}")
# LLM API test (uses primary model from fallback chain)
try:
test_resp = await asyncio.to_thread(
_llm_client.chat.completions.create,
model=_llm_model,
max_tokens=5,
messages=[{"role": "user", "content": "ping"}],
)
lines.append(f"🔌 LLM ({_llm_model}): ✅ OK")
except Exception as e:
lines.append(f"🔌 LLM ({_llm_model}): ❌ {type(e).__name__}: {e}")
# Digest sent today?
today_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for flag_path, label in [
(Path(__file__).parent / f".digest_sent_x_{persona_id}", "X digest"),
(Path(__file__).parent / f".digest_sent_reddit_{persona_id}", "Reddit digest"),
(Path(__file__).parent / f".digest_sent_{persona_id}", "News digest"),
]:
if flag_path.exists():
sent_date = flag_path.read_text().strip()
icon = "✅" if sent_date == today_str else "⚠️"
lines.append(f"{icon} {label}: last sent {sent_date}")
# Twitter cookie age (X bots)
cookies_path = Path(__file__).parent / "twitter_cookies.json"
if cookies_path.exists() and twitter_enabled:
import os as _os
age_secs = int(_time.time() - _os.path.getmtime(cookies_path))
age_h = age_secs // 3600
icon = "✅" if age_h < 13 else "⚠️"
lines.append(f"{icon} Twitter cookies: {age_h}h old")
await msg.reply_text("\n".join(lines), parse_mode="HTML")
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, chat_id, thread_id):
return
if not _check_private_user(update):
return
ck = _conv_key(chat_id, thread_id)
convs[ck].clear()
compressor.clear(ck)
saved = mem.flush_staging(_mem_key(chat_id, thread_id))
if saved:
msg = _r("clear_saved", n=saved)
else:
msg = _r("clear_empty")
await update.message.reply_text(msg)
async def subscribe(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, chat_id, thread_id):
return
if not _check_private_user(update):
return
subs = _load_subs()
if chat_id in subs:
await update.message.reply_text(_r("subscribe_already"))
return
subs.add(chat_id)
_save_subs(subs)
await update.message.reply_text(_r("subscribe_ok"))
async def unsubscribe(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, chat_id, thread_id):
return
if not _check_private_user(update):
return
subs = _load_subs()
if chat_id not in subs:
await update.message.reply_text(_r("unsubscribe_not"))
return
subs.discard(chat_id)
_save_subs(subs)
await update.message.reply_text(_r("unsubscribe_ok"))
async def news_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, chat_id, thread_id):
return
if not _check_private_user(update):
return
await update.message.reply_text(_r("news_loading"))
try:
if news_module == "crypto_news":
messages = await generate_crypto_digest("")
else:
messages = await generate_full_digest("")
for msg in messages:
if isinstance(msg, dict):
text = msg["text"]
pm = msg.get("parse_mode")
markup = msg.get("reply_markup")
else:
text = msg
pm = "HTML"
markup = None
await context.bot.send_message(chat_id=chat_id, text=text,
message_thread_id=thread_id,
parse_mode=pm,
reply_markup=markup)
except Exception as e:
logger.error("Digest error: %s", e)
await update.message.reply_text("⚠️ Something went wrong. Check logs.")
async def yields_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_type = update.effective_chat.type
chat_id = update.effective_chat.id
thread_id = update.message.message_thread_id if update.message else None
if chat_type != "private" and not _is_my_thread(chat_type, chat_id, thread_id):
return
await update.message.reply_text(_r("yields_loading"))
try:
messages = await generate_yield_report()
for msg in messages:
await context.bot.send_message(chat_id=chat_id, text=msg,
message_thread_id=thread_id)
except Exception as e:
logger.error("Yields error: %s", e)
await update.message.reply_text("⚠️ Something went wrong. Check logs.")