-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
229 lines (179 loc) · 7.48 KB
/
main.py
File metadata and controls
229 lines (179 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import json
import os
import threading
from collections.abc import Callable
from typing import Any
import requests
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler
from telegram.ext.filters import COMMAND
from feral_services import jackett, ru_torrent
from feral_services.jackett import TorrentInfo
load_dotenv()
_RESULTS: dict[int, dict[str, TorrentInfo]] = {}
_LAST_RESULT_DICT: dict[int, int] = {}
_USERS_FILE = "users.json"
_USERS: set[int] = set(json.load(open(_USERS_FILE)))
_ADMINS: set[str] = set((os.getenv("ADMINS") or "").split(","))
_LINUX_DIR_SIZE = 0
def save_user(user_id: int) -> None:
_USERS.add(user_id)
json.dump(list(_USERS), open(_USERS_FILE, "w"))
def auth_required(
func: Callable[[Update, ContextTypes.DEFAULT_TYPE], Any],
) -> Callable[[Update, ContextTypes.DEFAULT_TYPE], Any]:
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Any:
if not update.effective_user or not update.message:
return
if update.effective_user.id not in _USERS:
await update.message.reply_text("Authorise pls")
return
return await func(update, context)
return wrapper
def admin_required(
func: Callable[[Update, ContextTypes.DEFAULT_TYPE], Any],
) -> Callable[[Update, ContextTypes.DEFAULT_TYPE], Any]:
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Any:
if not update.effective_user or not update.message:
return
if str(update.effective_user.id) not in _ADMINS:
await update.message.reply_text("Bad, no admin")
return
return await func(update, context)
return wrapper
def get_home_size(start_new_thread: bool = True) -> None:
global _LINUX_DIR_SIZE
size = 0
home = os.path.expanduser("~")
for dirpath, _, filenames in os.walk(home):
for file in filenames:
file_path = os.path.join(dirpath, file)
size += os.path.getsize(file_path)
_LINUX_DIR_SIZE = size
if start_new_thread:
threading.Timer(3600, get_home_size).start()
async def auth(update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.effective_user or not update.message:
return
if update.effective_user.id in _USERS:
await update.message.reply_text("Already authorized")
return
password = os.getenv("PASSWORD")
if password and password in (update.message.text or ""):
save_user(update.effective_user.id)
await update.message.reply_text("Authorized")
return
await update.message.reply_text("Wrong password")
@auth_required
async def spaceforce(_update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
get_home_size(start_new_thread=False)
await space(_update, _context)
@auth_required
async def space(_update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
if not _update.message:
return
await _update.message.reply_text(
f"Home dir size: {_LINUX_DIR_SIZE / 1024 / 1024 / 1024:.2f}/1000 GB",
)
@auth_required
async def search(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text or not update.effective_user or not update.effective_chat:
return
_, term = update.message.text.split("/search", 1)
error, results = jackett.search(term)
if error or not results:
await update.message.reply_text(error or "No results found")
return
user_id = update.effective_user.id
if prior_search_msg_id := _LAST_RESULT_DICT.get(user_id):
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=prior_search_msg_id,
text=r"Previous search results have expired\. "
r"Use the latest search results below\.",
parse_mode="MarkdownV2",
)
returned_results_str = jackett.format_and_filter_results(
results,
user_id,
_RESULTS,
)
message = await update.message.reply_text(returned_results_str)
_LAST_RESULT_DICT[user_id] = message.message_id
@auth_required
async def download(update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text or not update.effective_user:
return
_, magnet = update.message.text.split("/download", 1)
username = update.effective_user.username or update.effective_user.first_name or "Unknown"
magnet_upload_result = ru_torrent.upload_magnet(
magnet,
"/download",
username,
)
await update.message.reply_text(magnet_upload_result)
@auth_required
async def get(update: Update, _context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.text or not update.effective_user:
return
if not update.message.text.startswith("/get"):
return
if update.effective_user.id not in _USERS:
await update.message.reply_text("Authorise please")
return
users_data = _RESULTS.get(update.effective_user.id)
if not users_data:
await update.message.reply_text("Not a valid item")
return
_, get_id = update.message.text.split("/get", 1)
result = users_data.get(get_id)
if not result:
await update.message.reply_text("Not a valid item")
return
username = update.effective_user.username or update.effective_user.first_name or "Unknown"
if magnet := result.magnet:
magnet_upload_result = ru_torrent.upload_magnet(magnet, result.source, username, result)
await update.message.reply_text(magnet_upload_result)
return
elif link := result.link:
url_response = requests.get(link, allow_redirects=False)
if not url_response.ok:
await update.message.reply_text(
f"Something went wrong downloading torrent file. The url was: {link}",
)
return
try:
if url_response.status_code == 302:
magnet_upload_result = ru_torrent.upload_magnet(
url_response.headers["Location"], result.source, username, result
)
await update.message.reply_text(magnet_upload_result)
return
torrent_upload_result = ru_torrent.upload_torrent(url_response.content, result.source, username, result)
await update.message.reply_text(torrent_upload_result)
return
except Exception as e:
print(e)
await update.message.reply_text("Something went wrong")
def main() -> None:
token = os.getenv("TELEGRAM_TOKEN")
if not token:
raise ValueError("TELEGRAM_TOKEN environment variable is required")
application = Application.builder().token(token).build()
application.add_handlers(
[
CommandHandler("spaceforce", spaceforce),
CommandHandler("space", space),
CommandHandler("auth", auth),
CommandHandler("search", search),
CommandHandler("download", download),
MessageHandler(COMMAND, get),
],
)
print("Getting home size!")
get_home_size()
print("Starting polling!")
application.run_polling()
if __name__ == "__main__":
main()