forked from Sideloading-Research/telegram_sideload
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
603 lines (494 loc) · 24.3 KB
/
main.py
File metadata and controls
603 lines (494 loc) · 24.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
import os
import asyncio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyParameters
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
CallbackQueryHandler,
)
import bot_config
from utils.mind_data_manager import MindDataManager
from conversation_manager import ConversationManager
from app_logic import AppLogic
from telegram.constants import ChatType, MessageEntityType, ChatAction
from telegram.error import TelegramError, BadRequest, TimedOut, NetworkError, RetryAfter
from config import BOT_ANSWERS_IN_GROUPS_ONLY_WHEN_MENTIONED7, RATE_LIMIT_EXCEEDED_MESSAGE
from config_sanity_checks import run_sanity_checks
import config
from utils.rate_limiter import is_global_rate_limited
from utils.group_settings import get_group_settings
import utils.group_usage_tracker as group_tracker
from utils.constants import c
# Initialize managers and services
# These are global instances for the bot's lifecycle.
MIND_MANAGER = MindDataManager.get_instance()
CONVERSATION_MANAGER = ConversationManager(mind_manager=MIND_MANAGER)
# Token and allowed IDs are now fetched via bot_config functions
TOKEN = bot_config.get_token()
ALLOWED_USER_IDS = bot_config.get_allowed_user_ids()
ALLOWED_GROUP_IDS = bot_config.get_allowed_group_ids()
TRIGGER_WORDS_LIST = bot_config.get_trigger_words() # Import trigger words
APPLICATION_LOGIC = AppLogic(
conversation_manager=CONVERSATION_MANAGER,
allowed_user_ids=ALLOWED_USER_IDS,
allowed_group_ids=ALLOWED_GROUP_IDS
)
async def reply_text_wrapper(
update: Update,
context: ContextTypes.DEFAULT_TYPE,
text: str,
reply_markup=None,
parse_mode=None,
disable_web_page_preview=None,
disable_notification=None,
reply_to_message_id=None,
allow_sending_without_reply=True,
max_retries: int = 3,
fallback_to_send_message: bool = True
) -> bool:
"""
Wrapper for sending text messages with comprehensive error handling.
Args:
update: The update object
context: The context object
text: The text to send
reply_markup: Optional inline keyboard markup
parse_mode: Optional parse mode (e.g., 'HTML', 'Markdown')
disable_web_page_preview: Optional boolean to disable link previews
disable_notification: Optional boolean to send without notification
reply_to_message_id: Optional message ID to reply to
allow_sending_without_reply: If True, send even if reply_to_message_id is invalid
max_retries: Maximum number of retry attempts for rate limits
fallback_to_send_message: If True, fallback to context.bot.send_message if reply fails
Returns:
bool: True if message was sent successfully, False otherwise
"""
if not text:
print("Warning: Attempted to send empty message")
return False
# Truncate message if too long (Telegram limit is 4096 characters)
if len(text) > 4096:
print(f"Warning: Message too long ({len(text)} chars), truncating to 4096")
text = text[:4093] + "..."
for attempt in range(max_retries):
try:
common_args = {
"text": text,
"reply_markup": reply_markup,
"parse_mode": parse_mode,
"disable_web_page_preview": disable_web_page_preview,
"disable_notification": disable_notification,
}
if update.message and hasattr(update.message, 'reply_text'):
# Determine the actual message ID to reply to for update.message.reply_text
# If caller provides reply_to_message_id, that takes precedence.
# Otherwise, update.message.reply_text replies to update.message.message_id.
effective_reply_to_msg_id = reply_to_message_id if reply_to_message_id is not None else update.message.message_id
reply_params_obj = ReplyParameters(
message_id=effective_reply_to_msg_id,
allow_sending_without_reply=allow_sending_without_reply
)
await update.message.reply_text(
**common_args,
reply_parameters=reply_params_obj
)
return True
elif fallback_to_send_message and update.effective_chat:
send_args = {**common_args}
if reply_to_message_id is not None:
# If caller wants to reply to a specific message via send_message
reply_params_obj = ReplyParameters(
message_id=reply_to_message_id,
allow_sending_without_reply=allow_sending_without_reply
)
send_args["reply_parameters"] = reply_params_obj
else:
# Not replying to a specific message via send_message.
# Pass allow_sending_without_reply directly, as no ReplyParameters are involved.
send_args["allow_sending_without_reply"] = allow_sending_without_reply
await context.bot.send_message(
chat_id=update.effective_chat.id,
**send_args
)
return True
else:
print("Error: No valid method to send message (no update.message or effective_chat)")
return False
except Exception as e:
print(f"Error sending message (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}")
# If this was the last attempt, give up
if attempt == max_retries - 1:
print(f"Failed to send message after {max_retries} attempts")
return False
# Exponential backoff: 2^attempt seconds (1s, 2s, 4s, 8s, etc.)
wait_time = 2 ** attempt
# Special handling for rate limit errors - use their suggested wait time
if hasattr(e, 'retry_after'):
wait_time = e.retry_after
print(f"Rate limited. Waiting {wait_time} seconds as requested by Telegram")
else:
print(f"Waiting {wait_time} seconds before retry...")
await asyncio.sleep(wait_time)
return False
async def send_typing_periodically(context: ContextTypes.DEFAULT_TYPE, chat_id: int):
"""Sends typing action every 4 seconds."""
try:
while True:
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
await asyncio.sleep(4)
except asyncio.CancelledError:
# This is expected when the task is cancelled.
# You can add a print here for debugging if you want.
pass
except Exception as e:
print(f"Error in send_typing_periodically: {e}")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user = update.effective_user
# The 'if True:' condition seems to be a placeholder for potential future logic.
# Keeping it as is for now.
if True:
keyboard = [[InlineKeyboardButton("Start", callback_data="start_game")]]
reply_markup = InlineKeyboardMarkup(keyboard)
welcome_message = f"Hello {user.first_name}, ich bin dein hilfreicher Assistent! Clicke 'Start'"
await reply_text_wrapper(
update=update,
context=context,
text=welcome_message,
reply_markup=reply_markup
)
async def start_new_game(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
text = "Wie kann ich dir helfen?"
if update.message: # Can be triggered by a command
await reply_text_wrapper(update, context, text)
elif update.callback_query: # Can be triggered by a button press
await update.callback_query.edit_message_text(text)
async def start_game_callback(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
query = update.callback_query
await query.answer()
# Forwards to start_new_game to send the actual "Wie kann ich dir helfen?" message
await start_new_game(update, context)
async def restrict(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
chat_id = update.effective_chat.id
if update.effective_chat.type == 'private':
text = f"Keine Berechtigung für user_id {user_id}."
else: # group/supergroup
text = f"This group (ID: {chat_id}) is not authorized to use this bot."
await reply_text_wrapper(update, context, text)
def is_allowed(update: Update) -> bool:
"""Check if the user/chat is allowed by delegating to AppLogic."""
chat_type = update.effective_chat.type
user_id = update.effective_user.id
chat_id = update.effective_chat.id
return APPLICATION_LOGIC.check_authorization(chat_type, user_id, chat_id)
# Plugin Management Commands
async def plugins_status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Show the status of all plugins."""
if not is_allowed(update):
await restrict(update, context)
return
status = APPLICATION_LOGIC.get_plugin_status()
message = "**Plugin Status:**\n\n"
for plugin_name, enabled in status.items():
status_emoji = "✅" if enabled else "❌"
message += f"{status_emoji} `{plugin_name}`: {'Enabled' if enabled else 'Disabled'}\n"
await reply_text_wrapper(update, context, message, parse_mode="Markdown")
async def enable_plugin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Enable a specific plugin."""
if not is_allowed(update):
await restrict(update, context)
return
if not context.args:
await reply_text_wrapper(update, context, "Usage: /enable_plugin <plugin_name>")
return
plugin_name = context.args[0]
if APPLICATION_LOGIC.enable_plugin(plugin_name):
await reply_text_wrapper(update, context, f"✅ Plugin `{plugin_name}` enabled.", parse_mode="Markdown")
else:
await reply_text_wrapper(update, context, f"❌ Plugin `{plugin_name}` not found.", parse_mode="Markdown")
async def disable_plugin_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Disable a specific plugin."""
if not is_allowed(update):
await restrict(update, context)
return
if not context.args:
await reply_text_wrapper(update, context, "Usage: /disable_plugin <plugin_name>")
return
plugin_name = context.args[0]
if APPLICATION_LOGIC.disable_plugin(plugin_name):
await reply_text_wrapper(update, context, f"❌ Plugin `{plugin_name}` disabled.", parse_mode="Markdown")
else:
await reply_text_wrapper(update, context, f"❌ Plugin `{plugin_name}` not found.", parse_mode="Markdown")
async def enable_all_plugins_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Enable all plugins."""
if not is_allowed(update):
await restrict(update, context)
return
APPLICATION_LOGIC.enable_all_plugins()
await reply_text_wrapper(update, context, "✅ All plugins enabled.")
async def disable_all_plugins_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Disable all plugins."""
if not is_allowed(update):
await restrict(update, context)
return
APPLICATION_LOGIC.disable_all_plugins()
await reply_text_wrapper(update, context, "❌ All plugins disabled.")
def _extract_message_content(update: Update) -> tuple[str, str | None, list | None]:
"""
Extracts user message text and entities from the update.
Returns:
tuple[str, str | None, list | None]: (user_message_text, message_text_for_mention_check, message_entities)
"""
user_message_text = "<unsupported message type>"
message_text_content_for_mention_check = None
message_entities_for_mention_check = None
if update.message:
print(f"Chat type: {update.message.chat.type}")
# Prioritize message.text for user_message_text and mention checks
if update.message.text:
user_message_text = update.message.text
message_text_content_for_mention_check = update.message.text
message_entities_for_mention_check = update.message.entities
# Fallback to caption if message.text is empty but caption exists
elif update.message.caption:
user_message_text = update.message.caption # Use caption as the primary message content
message_text_content_for_mention_check = update.message.caption
message_entities_for_mention_check = update.message.caption_entities
else:
# Neither text nor caption (e.g. sticker, voice message without caption)
# user_message_text remains "<unsupported message type>"
pass
else:
# This case should ideally not be hit by a MessageHandler unless filters are very broad
# user_message_text remains "<unsupported message type>"
pass
return user_message_text, message_text_content_for_mention_check, message_entities_for_mention_check
def _is_bot_mentioned(update: Update, context: ContextTypes.DEFAULT_TYPE, message_text: str | None, message_entities: list | None) -> bool:
"""
Checks if the bot is mentioned in a group chat via @mention, reply, or trigger word.
"""
if not update.message or update.message.chat.type not in [ChatType.GROUP, ChatType.SUPERGROUP]:
return False
# If BOT_ANSWERS_IN_GROUPS_ONLY_WHEN_MENTIONED7 is False, we don't strictly *need* to check for mentions
# to decide whether to reply (we reply anyway), but this function's purpose is just "is mentioned?".
# So we proceed with checking.
bot_was_mentioned_or_replied_to = False
if message_text and context.bot.username:
bot_username_at = f"@{context.bot.username}"
if message_entities:
for entity in message_entities:
if entity.type == MessageEntityType.MENTION:
mention_text = message_text[entity.offset : entity.offset + entity.length]
if mention_text.lower() == bot_username_at.lower():
bot_was_mentioned_or_replied_to = True
break
elif entity.type == MessageEntityType.TEXT_MENTION:
if entity.user and entity.user.id == context.bot.id:
bot_was_mentioned_or_replied_to = True
break
if not bot_was_mentioned_or_replied_to and update.message.reply_to_message:
if update.message.reply_to_message.from_user and update.message.reply_to_message.from_user.id == context.bot.id:
bot_was_mentioned_or_replied_to = True
if not bot_was_mentioned_or_replied_to and TRIGGER_WORDS_LIST and message_text:
# Check against trigger words case-insensitive
message_text_lower = message_text.lower()
for word in TRIGGER_WORDS_LIST:
if word in message_text_lower:
bot_was_mentioned_or_replied_to = True
break
return bot_was_mentioned_or_replied_to
def _should_generate_ai_reply(chat_type: str, is_mentioned: bool) -> bool:
"""
Decides if we should generate an AI reply based on chat type and configuration.
"""
if chat_type == ChatType.PRIVATE:
return True
elif chat_type in [ChatType.GROUP, ChatType.SUPERGROUP]:
if not BOT_ANSWERS_IN_GROUPS_ONLY_WHEN_MENTIONED7:
return True # Always generate if config is off
else:
return is_mentioned
return False
async def _process_and_reply(update: Update, context: ContextTypes.DEFAULT_TYPE,
user_message_text: str, generate_ai_reply: bool, should_send_reply: bool):
"""
Processes the request via AppLogic and sends a reply if needed.
"""
user_id = update.effective_user.id
username = update.effective_user.username
first_name = update.effective_user.first_name
last_name = update.effective_user.last_name
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
# Rate Limit Check
if generate_ai_reply:
is_limited, should_warn = is_global_rate_limited()
if is_limited:
if should_warn:
print(f"Rate limit exceeded. Blocking request from {user_id} and sending warning.")
await reply_text_wrapper(update, context, RATE_LIMIT_EXCEEDED_MESSAGE)
else:
print(f"Rate limit exceeded. Silently blocking request from {user_id}.")
return
typing_task = None
if generate_ai_reply:
typing_task = asyncio.create_task(
send_typing_periodically(context, chat_id)
)
try:
answer, provider_report, _ = await asyncio.to_thread(
APPLICATION_LOGIC.process_user_request,
user_id=user_id,
raw_user_message=user_message_text,
chat_id=chat_id,
chat_type=chat_type,
generate_ai_reply=generate_ai_reply,
username=username,
first_name=first_name,
last_name=last_name
)
if answer and should_send_reply:
if provider_report:
await reply_text_wrapper(update, context, provider_report)
sent = await reply_text_wrapper(update, context, answer)
if sent:
# Increment group limits only after successful send
await asyncio.to_thread(group_tracker.increment_group_usage, update, context)
elif not answer and should_send_reply:
print(f"Logic indicated a reply should be sent in chat {chat_id}, but no answer was generated. Expected if not mentioned in restricted mode.")
else:
pass
finally:
if typing_task:
typing_task.cancel()
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_message_text, message_text_for_mention, message_entities = _extract_message_content(update)
# Check for admin commands
command_text = user_message_text.strip()
# Strip bot mention if present at the start (handle case-insensitivity for username)
if context.bot.username:
bot_mention_at = f"@{context.bot.username}"
if command_text.lower().startswith(bot_mention_at.lower()):
command_text = command_text[len(bot_mention_at):].strip()
if command_text == c.test_mode_command:
config.set_data_source_mode("QUICK_TEST")
MIND_MANAGER.force_refresh()
await reply_text_wrapper(update, context, "quick test mode activated")
return
if command_text == c.normal_mode_command:
config.set_data_source_mode("NORMAL")
MIND_MANAGER.force_refresh()
await reply_text_wrapper(update, context, "normal mode activated")
return
if is_allowed(update):
chat_type = update.effective_chat.type
is_mentioned = _is_bot_mentioned(update, context, message_text_for_mention, message_entities)
generate_ai_reply = _should_generate_ai_reply(chat_type, is_mentioned)
# Determine if we should send the reply to Telegram
# Logic: If private, always yes. If group, depends on config and mention status.
should_send_reply = False
if chat_type == ChatType.PRIVATE:
should_send_reply = True
elif chat_type in [ChatType.GROUP, ChatType.SUPERGROUP]:
if not BOT_ANSWERS_IN_GROUPS_ONLY_WHEN_MENTIONED7:
should_send_reply = True
else:
should_send_reply = is_mentioned
# Check Group Limits (Before generating AI reply)
# This prevents AI costs and spam if the limit is reached.
# We only check if we INTEND to generate a reply.
if generate_ai_reply:
if not group_tracker.check_group_limits(update, context):
# Limit exceeded or forbidden.
# We stop processing silently (as per typical flood protection).
print(f"Group limits reached for chat {update.effective_chat.id}. Dropping request.")
return
await _process_and_reply(update, context, command_text, generate_ai_reply, should_send_reply)
else:
await restrict(update, context)
async def handle_group_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if is_allowed(update):
is_limited, should_warn = is_global_rate_limited()
if is_limited:
if should_warn:
print(f"Rate limit exceeded. Blocking request from {update.effective_user.id} and sending warning.")
await reply_text_wrapper(update, context, RATE_LIMIT_EXCEEDED_MESSAGE)
else:
print(f"Rate limit exceeded. Silently blocking request from {update.effective_user.id}.")
return
typing_task = asyncio.create_task(
send_typing_periodically(context, update.effective_chat.id)
)
try:
user_id = update.effective_user.id
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
# Get the text after the command
user_input = ' '.join(context.args)
if not user_input:
await reply_text_wrapper(update, context, "Please provide a message after the command, like: /ask How are you?")
return
username = update.effective_user.username
first_name = update.effective_user.first_name
last_name = update.effective_user.last_name
answer, provider_report, _ = await asyncio.to_thread(
APPLICATION_LOGIC.process_user_request,
user_id=user_id,
raw_user_message=user_input,
chat_id=chat_id,
chat_type=chat_type,
generate_ai_reply=True, # Commands should always generate a reply
username=username,
first_name=first_name,
last_name=last_name
)
if provider_report: # Send provider switch report if any
await reply_text_wrapper(update, context, provider_report)
await reply_text_wrapper(update, context, answer)
finally:
typing_task.cancel()
else:
await restrict(update, context)
def main():
run_sanity_checks()
app = Application.builder().token(TOKEN).build()
# Define combined filter for private chats not in ALLOWED_USER_IDS
private_chat_filter_restricted = filters.ChatType.PRIVATE & ~filters.User(user_id=ALLOWED_USER_IDS)
# Define combined filter for group chats not in ALLOWED_GROUP_IDS
if ALLOWED_GROUP_IDS:
group_chat_filter_restricted = filters.ChatType.GROUPS & ~filters.Chat(chat_id=ALLOWED_GROUP_IDS)
else:
group_chat_filter_restricted = filters.ChatType.GROUPS
restrict_handler = MessageHandler(
private_chat_filter_restricted | group_chat_filter_restricted,
restrict
)
app.add_handler(restrict_handler, group=-1)
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("ask", handle_group_command))
# Plugin management commands
app.add_handler(CommandHandler("plugins", plugins_status))
app.add_handler(CommandHandler("enable_plugin", enable_plugin_cmd))
app.add_handler(CommandHandler("disable_plugin", disable_plugin_cmd))
app.add_handler(CommandHandler("enable_all_plugins", enable_all_plugins_cmd))
app.add_handler(CommandHandler("disable_all_plugins", disable_all_plugins_cmd))
app.add_handler(CallbackQueryHandler(start_game_callback, pattern="^start_game$"))
allowed_message_content_filter = (filters.TEXT | filters.CAPTION) & ~filters.COMMAND
allowed_private_messages_filter = filters.ChatType.PRIVATE & filters.User(user_id=ALLOWED_USER_IDS) & allowed_message_content_filter
if ALLOWED_GROUP_IDS:
allowed_group_messages_filter = filters.ChatType.GROUPS & filters.Chat(chat_id=ALLOWED_GROUP_IDS) & allowed_message_content_filter
else:
allowed_group_messages_filter = filters.NONE
app.add_handler(MessageHandler(
allowed_private_messages_filter | allowed_group_messages_filter,
handle_message
))
print("Bot polling...")
app.run_polling()
if __name__ == "__main__":
main()