-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
99 lines (78 loc) · 3.06 KB
/
bot.py
File metadata and controls
99 lines (78 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import telebot
import subprocess
import os
import threading
import sys
# --- CONFIGURATION ---
API_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
ALLOWED_USER_ID = int(os.getenv('ALLOWED_USER_ID', '0'))
if not API_TOKEN:
print("ERROR: TELEGRAM_BOT_TOKEN environment variable is not set")
sys.exit(1)
if ALLOWED_USER_ID == 0:
print("ERROR: ALLOWED_USER_ID environment variable is not set")
sys.exit(1)
bot = telebot.TeleBot(API_TOKEN)
def run_script_task(chat_id, command):
"""Executes the command in a background thread to keep the bot responsive."""
try:
# Runs the command and captures output
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
bot.send_message(chat_id, f"🚀 Started task: `{command}`", parse_mode="Markdown")
stdout, stderr = process.communicate()
if stdout:
bot.send_message(chat_id, f"✅ *Output*:\n```\n{stdout[-4000:]}\n```", parse_mode="Markdown")
if stderr:
bot.send_message(chat_id, f"⚠️ *Error*:\n```\n{stderr[-4000:]}\n```", parse_mode="Markdown")
if not stdout and not stderr:
bot.send_message(chat_id, f"✅ Command executed successfully (no output)")
except Exception as e:
bot.send_message(chat_id, f"❌ Execution failed: {str(e)}")
@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
if message.from_user.id != ALLOWED_USER_ID:
bot.reply_to(message, "🚫 Unauthorized.")
return
help_text = """🤖 *Windows Command Runner Bot*
Available commands:
• `/run <command>` - Execute any Windows command or script
• `/help` - Show this message
Examples:
• `/run dir`
• `/run powershell -Command "Get-Process"`
• `/run python script.py`
• `/run C:\\path\\to\\script.ps1`
Only authorized users can execute commands."""
bot.send_message(message.chat.id, help_text, parse_mode="Markdown")
@bot.message_handler(commands=['run'])
def execute_command(message):
# Security: Only the owner can run commands
if message.from_user.id != ALLOWED_USER_ID:
bot.reply_to(message, "🚫 Unauthorized.")
return
command = message.text.replace('/run ', '', 1).strip()
if not command or command == '/run':
bot.reply_to(message, "Usage: `/run python3 myscript.py` or `/run powershell -Command ...`", parse_mode="Markdown")
return
# Run in a separate thread to prevent the bot from freezing during long tasks
thread = threading.Thread(target=run_script_task, args=(message.chat.id, command))
thread.daemon = True
thread.start()
print("=" * 50)
print("🤖 Windows Command Runner Bot is starting...")
print("=" * 50)
print(f"Authorized User ID: {ALLOWED_USER_ID}")
print("Ready to execute commands from Telegram")
print("=" * 50)
try:
bot.infinity_polling()
except KeyboardInterrupt:
print("\nBot stopped by user")
except Exception as e:
print(f"Bot error: {e}")