diff --git a/app.py b/app.py
index e4774b3..31f89e1 100644
--- a/app.py
+++ b/app.py
@@ -51,22 +51,36 @@ async def _init_db() -> None:
def _init_logger() -> None:
"""初始化日志记录器。"""
- logging.basicConfig(
- format="%(levelname)s [%(asctime)s] %(name)s - %(message)s",
+ handler = logging.StreamHandler() # 输出到终端
+ formatter = logging.Formatter(
+ fmt="%(levelname)s [%(asctime)s] %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
- level=config.log_level,
- filename="default.log",
)
+ handler.setFormatter(formatter)
+
+ # 添加流处理器
+ logger.addHandler(handler)
+
+ # 设置日志级别
+ logger.setLevel(config.log_level)
+
+ # 如果你需要将日志写入文件,可以继续保持原有的文件配置:
+ file_handler = logging.FileHandler("default.log")
+ file_handler.setFormatter(formatter)
+ logger.addHandler(file_handler)
+
def _init_tz() -> None:
"""初始化时区设置。"""
if config.timezone:
try:
timezone = pytz.timezone(config.timezone)
- now = datetime.now(timezone).strftime('%Y-%m-%d %H:%M:%S')
+ now = datetime.now(timezone).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"时区已设置为: {config.timezone},当前时间: {now}")
except pytz.UnknownTimeZoneError:
- logger.error(f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。")
+ logger.error(
+ f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。"
+ )
async def setup_bot() -> BotClient:
@@ -94,7 +108,7 @@ async def main() -> None:
_init_logger()
_init_tz()
- now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"程序启动时间: {now}")
await _init_db()
diff --git a/bot/__init__.py b/bot/__init__.py
index b8652cf..7e6f38c 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -18,4 +18,4 @@
"get_user_telegram_id",
"parse_iso8601_to_normal_date",
"parse_timestamp_to_normal_date",
-]
\ No newline at end of file
+]
diff --git a/bot/bot_client.py b/bot/bot_client.py
index 7efbee4..971601f 100644
--- a/bot/bot_client.py
+++ b/bot/bot_client.py
@@ -7,17 +7,14 @@
class BotClient:
def __init__(
- self,
- api_id: str,
- api_hash: str,
- bot_token: str,
- name="emby_bot",
+ self,
+ api_id: str,
+ api_hash: str,
+ bot_token: str,
+ name="emby_bot",
):
self.client = Client(
- name=name,
- api_id=api_id,
- api_hash=api_hash,
- bot_token=bot_token
+ name=name, api_id=api_id, api_hash=api_hash, bot_token=bot_token
)
logger.info(f"Bot client initialized with name: {name}")
@@ -41,4 +38,4 @@ async def idle():
def stop(self):
logger.info("Stopping bot client")
- return self.client.stop()
\ No newline at end of file
+ return self.client.stop()
diff --git a/bot/commands.py b/bot/commands.py
index 2a3d801..c861633 100644
--- a/bot/commands.py
+++ b/bot/commands.py
@@ -1,12 +1,22 @@
import logging
+import functools
from datetime import datetime
from pyrogram import filters
from pyrogram.enums import ParseMode
-from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery, Message
+from pyrogram.types import (
+ InlineKeyboardButton,
+ InlineKeyboardMarkup,
+ CallbackQuery,
+ Message,
+)
from bot.bot_client import BotClient
-from bot.filters import user_in_group_on_filter, admin_user_on_filter, emby_user_on_filter
+from bot.filters import (
+ user_in_group_on_filter,
+ admin_user_on_filter,
+ emby_user_on_filter,
+)
from bot.message_helper import get_user_telegram_id
from bot.utils import parse_iso8601_to_normal_date
from config import config
@@ -41,16 +51,32 @@ def _parse_args(message: Message) -> list[str]:
parts = message.text.strip().split(" ")
return parts[1:] if len(parts) > 1 else []
- async def _ensure_args(self, message: Message, args: list, min_len: int, usage: str):
+ @staticmethod
+ def ensure_args(min_len: int, usage: str):
"""
- 确保命令行参数长度足够,不足则回复用法说明。
+ 装饰器:确保命令行参数长度足够,不足则回复用法说明。
"""
- if len(args) < min_len:
- await self._reply_html(message, f"参数不足,请参考用法:\n{usage}")
- return False
- return True
- async def _send_error(self, message: Message, error: Exception, prefix: str = "操作失败"):
+ def decorator(func):
+ @functools.wraps(func)
+ async def wrapper(self, message, *args, **kwargs):
+ # 从消息中解析参数
+ parsed_args = self._parse_args(message)
+ if len(parsed_args) < min_len:
+ await self._reply_html(
+ message, f"参数不足,请参考用法:\n{usage}"
+ )
+ return
+ # 将解析好的参数传递给目标函数,避免在函数内部再调用 _parse_args
+ return await func(self, message, parsed_args, *args, **kwargs)
+
+ return wrapper
+
+ return decorator
+
+ async def _send_error(
+ self, message: Message, error: Exception, prefix: str = "操作失败"
+ ):
"""
统一的异常捕获后回复方式。
"""
@@ -59,22 +85,22 @@ async def _send_error(self, message: Message, error: Exception, prefix: str = "
# =============== 各类命令逻辑 ===============
- async def create_user(self, message: Message):
+ @ensure_args(1, "/create <用户名>")
+ async def create_user(self, message: Message, args: list[str]):
"""
/create <用户名>
"""
- args = self._parse_args(message)
- if not await self._ensure_args(message, args, 1, "/create <用户名>"):
- return
emby_name = args[0]
try:
default_password = self.user_service.gen_default_passwd()
- user = await self.user_service.emby_create_user(message.from_user.id, emby_name, default_password)
+ user = await self.user_service.emby_create_user(
+ message.from_user.id, emby_name, default_password
+ )
if user and user.has_emby_account():
await self._reply_html(
message,
- f"✅ 创建用户成功。\n初始密码:{default_password}"
+ f"✅ 创建用户成功。\n初始密码:{default_password}",
)
else:
await self._reply_html(message, "❌ 创建用户失败,请稍后重试。")
@@ -89,10 +115,17 @@ async def info(self, message: Message):
telegram_id = await get_user_telegram_id(self.bot_client.client, message)
try:
user, emby_info = await self.user_service.emby_info(telegram_id)
- last_active = (parse_iso8601_to_normal_date(emby_info.get("LastActivityDate"))
- if emby_info.get("LastActivityDate") else "无")
- date_created = parse_iso8601_to_normal_date(emby_info.get("DateCreated", ""))
- ban_status = "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用"
+ last_active = (
+ parse_iso8601_to_normal_date(emby_info.get("LastActivityDate"))
+ if emby_info.get("LastActivityDate")
+ else "无"
+ )
+ date_created = parse_iso8601_to_normal_date(
+ emby_info.get("DateCreated", "")
+ )
+ ban_status = (
+ "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用"
+ )
reply_text = (
f"👤 用户信息:\n"
@@ -105,7 +138,9 @@ async def info(self, message: Message):
)
if user.ban_time and user.ban_time > 0:
- ban_time = datetime.fromtimestamp(user.ban_time).strftime('%Y-%m-%d %H:%M:%S')
+ ban_time = datetime.fromtimestamp(user.ban_time).strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
reply_text += f"• 被ban时间:{ban_time}\n"
if user.reason:
reply_text += f"• 被ban原因:{user.reason}\n"
@@ -114,13 +149,11 @@ async def info(self, message: Message):
except Exception as e:
await self._send_error(message, e, prefix="查询失败")
- async def use_code(self, message: Message):
+ @ensure_args(1, "/use_code <邀请码>")
+ async def use_code(self, message: Message, args: list[str]):
"""
/use_code <邀请码>
"""
- args = self._parse_args(message)
- if not await self._ensure_args(message, args, 1, "/use_code <邀请码>"):
- return
code = args[0]
telegram_id = message.from_user.id
@@ -130,14 +163,18 @@ async def use_code(self, message: Message):
return await self._reply_html(message, "❌ 邀请码使用失败")
# 根据类型给出不同的回复
if used_code.code_type == InviteCodeType.REGISTER:
- await self._reply_html(message, "✅ 邀请码使用成功,您已获得创建账号资格")
+ await self._reply_html(
+ message, "✅ 邀请码使用成功,您已获得创建账号资格"
+ )
else:
await self._reply_html(message, "✅ 邀请码使用成功,您已获得白名单资格")
# 如果该邀请码在bot中记录了消息,需要删除
if self.code_to_message_id.get(code):
code_to_message_id = self.code_to_message_id[code]
- await self.bot_client.client.delete_messages(code_to_message_id[0], code_to_message_id[1])
+ await self.bot_client.client.delete_messages(
+ code_to_message_id[0], code_to_message_id[1]
+ )
del self.code_to_message_id[code]
except Exception as e:
await self._send_error(message, e, prefix="邀请码使用失败")
@@ -148,10 +185,12 @@ async def reset_emby_password(self, message: Message):
"""
default_password = self.user_service.gen_default_passwd()
try:
- if await self.user_service.reset_password(message.from_user.id, default_password):
+ if await self.user_service.reset_password(
+ message.from_user.id, default_password
+ ):
await self._reply_html(
message,
- f"✅ 密码重置成功。\n新密码:{default_password}"
+ f"✅ 密码重置成功。\n新密码:{default_password}",
)
else:
await self._reply_html(message, "❌ 密码重置失败,请稍后重试。")
@@ -168,11 +207,15 @@ async def new_code(self, message: Message):
try:
num = int(args[0])
except ValueError:
- return await self._reply_html(message, "❌ 请输入有效数量 /new_code [整数]")
+ return await self._reply_html(
+ message, "❌ 请输入有效数量 /new_code [整数]"
+ )
num = min(num, 20)
try:
- code_list = await self.user_service.create_invite_code(message.from_user.id, num)
+ code_list = await self.user_service.create_invite_code(
+ message.from_user.id, num
+ )
for code_obj in code_list:
message_text = f"📌 邀请码:\n点击复制👉{code_obj.code}"
if message.reply_to_message is not None:
@@ -188,10 +231,7 @@ async def new_code(self, message: Message):
)
await self._reply_html(message, "✅ 已发送邀请码")
else:
- msg = await self._reply_html(
- message,
- message_text
- )
+ msg = await self._reply_html(message, message_text)
self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id)
except Exception as e:
await self._send_error(message, e, prefix="创建邀请码失败")
@@ -206,13 +246,19 @@ async def new_whitelist_code(self, message: Message):
try:
num = int(args[0])
except ValueError:
- return await self._reply_html(message, "❌ 请输入有效数量 /new_whitelist_code [整数]")
+ return await self._reply_html(
+ message, "❌ 请输入有效数量 /new_whitelist_code [整数]"
+ )
num = min(num, 20)
try:
- code_list = await self.user_service.create_whitelist_code(message.from_user.id, num)
+ code_list = await self.user_service.create_whitelist_code(
+ message.from_user.id, num
+ )
for code_obj in code_list:
- message_text = f"📌 白名单邀请码:\n点击复制👉{code_obj.code}"
+ message_text = (
+ f"📌 白名单邀请码:\n点击复制👉{code_obj.code}"
+ )
if message.reply_to_message is not None:
await self.bot_client.client.send_message(
chat_id=message.from_user.id,
@@ -226,10 +272,7 @@ async def new_whitelist_code(self, message: Message):
)
await self._reply_html(message, "✅ 已发送邀请码")
else:
- msg = await self._reply_html(
- message,
- message_text
- )
+ msg = await self._reply_html(message, message_text)
self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id)
except Exception as e:
await self._send_error(message, e, prefix="创建白名单邀请码失败")
@@ -246,8 +289,7 @@ async def ban_emby(self, message: Message):
try:
if await self.user_service.emby_ban(telegram_id, reason, operator_id):
await self._reply_html(
- message,
- f"✅ 已禁用用户 {telegram_id} 的Emby账号"
+ message, f"✅ 已禁用用户 {telegram_id} 的Emby账号"
)
else:
await self._reply_html(message, "❌ 禁用失败,请稍后重试。")
@@ -263,8 +305,7 @@ async def unban_emby(self, message: Message):
try:
if await self.user_service.emby_unban(telegram_id, operator_id):
await self._reply_html(
- message,
- f"✅ 已解禁用户 {telegram_id} 的Emby账号"
+ message, f"✅ 已解禁用户 {telegram_id} 的Emby账号"
)
else:
await self._reply_html(message, "❌ 解禁失败,请稍后重试。")
@@ -278,22 +319,32 @@ async def select_line(self, message: Message):
"""
try:
telegram_id = message.from_user.id
- router_list = config.router_list or await self.user_service.get_router_list(telegram_id)
+ router_list = config.router_list or await self.user_service.get_router_list(
+ telegram_id
+ )
# 缓存到 config 中,减少重复获取
if router_list and not config.router_list:
config.router_list = router_list
user_router = await self.user_service.get_user_router(telegram_id)
- user_router_index = user_router.get('index', '')
+ user_router_index = user_router.get("index", "")
message_text = f"当前线路:{user_router_index}\n请选择线路:"
message_buttons = []
for router in router_list:
- index = router.get('index')
- name = router.get('name')
+ index = router.get("index")
+ name = router.get("name")
# 已选线路高亮
- button_text = f"🔵 {name}" if index == user_router_index else f"⚪ {name}"
- message_buttons.append([InlineKeyboardButton(button_text, callback_data=f"SELECTROUTE_{index}")])
+ button_text = (
+ f"🔵 {name}" if index == user_router_index else f"⚪ {name}"
+ )
+ message_buttons.append(
+ [
+ InlineKeyboardButton(
+ button_text, callback_data=f"SELECTROUTE_{index}"
+ )
+ ]
+ )
keyboard = InlineKeyboardMarkup(message_buttons)
await self._reply_html(message, message_text, reply_markup=keyboard)
@@ -307,8 +358,14 @@ async def group_member_change_handler(self, clent, message: Message):
if message.left_chat_member:
left_member_id = message.left_chat_member.id
left_member = await self.user_service.must_get_user(left_member_id)
- if left_member.has_emby_account() and not left_member.is_emby_baned() and not left_member.is_whitelist:
- await self.user_service.emby_ban(message.left_chat_member.id, "用户已退出群组")
+ if (
+ left_member.has_emby_account()
+ and not left_member.is_emby_baned()
+ and not left_member.is_whitelist
+ ):
+ await self.user_service.emby_ban(
+ message.left_chat_member.id, "用户已退出群组"
+ )
config.group_members.pop(message.left_chat_member.id, None)
if message.new_chat_members:
for new_member in message.new_chat_members:
@@ -318,20 +375,24 @@ async def handle_callback_query(self, client, callback_query: CallbackQuery):
"""
回调按钮事件统一处理,如切换线路。
"""
- data = callback_query.data.split('_')
- if data[0] == 'SELECTROUTE':
+ data = callback_query.data.split("_")
+ if data[0] == "SELECTROUTE":
index = data[1]
try:
if not config.router_list:
await callback_query.answer("尚未加载线路列表,请稍后重试")
return
- selected_router = next((r for r in config.router_list if r['index'] == index), None)
+ selected_router = next(
+ (r for r in config.router_list if r["index"] == index), None
+ )
if not selected_router:
await callback_query.answer("线路不存在")
return
- await self.user_service.update_user_router(callback_query.from_user.id, index)
+ await self.user_service.update_user_router(
+ callback_query.from_user.id, index
+ )
await callback_query.answer("线路已更新")
await callback_query.message.edit(
f"已选择 {selected_router['name']}\n"
@@ -357,19 +418,17 @@ async def count(self, message: Message):
f"🎬 电影数量:{count_data.get('MovieCount', 0)}\n"
f"📽️ 剧集数量:{count_data.get('SeriesCount', 0)}\n"
f"🎞️ 总集数:{count_data.get('EpisodeCount', 0)}\n"
- )
+ ),
)
except Exception as e:
await self._send_error(message, e, prefix="查询失败")
- async def register_until(self, message: Message):
+ @ensure_args(2, "/register_until 2023-10-01 12:00:00")
+ async def register_until(self, message: Message, args: list[str]):
"""
/register_until <时间: YYYY-MM-DD HH:MM:SS>
限时开放注册
"""
- args = self._parse_args(message)
- if not await self._ensure_args(message, args, 2, "/register_until 2023-10-01 12:00:00"):
- return
time_str = " ".join(args)
try:
@@ -378,24 +437,30 @@ async def register_until(self, message: Message):
if time < now:
return await self._reply_html(message, "❌ 时间必须晚于当前时间")
- await self.user_service.set_emby_config(message.from_user.id, register_public_time=int(time.timestamp()))
- await self._reply_html(message, f"✅ 已开放注册,截止时间:{time_str}")
+ await self.user_service.set_emby_config(
+ message.from_user.id, register_public_time=int(time.timestamp())
+ )
+ await self._reply_html(
+ message, f"✅ 已开放注册,截止时间:{time_str}"
+ )
except Exception as e:
await self._send_error(message, e, prefix="开放注册失败")
- async def register_amount(self, message: Message):
+ @ensure_args(1, "/register_amount <人数>")
+ async def register_amount(self, message: Message, args: list[str]):
"""
/register_amount <人数>
开放指定数量的注册名额
"""
- args = self._parse_args(message)
- if not await self._ensure_args(message, args, 1, "/register_amount <人数>"):
- return
try:
amount = int(args[0])
- await self.user_service.set_emby_config(message.from_user.id, register_public_user=amount)
- await self._reply_html(message, f"✅ 已开放注册,名额:{amount}")
+ await self.user_service.set_emby_config(
+ message.from_user.id, register_public_user=amount
+ )
+ await self._reply_html(
+ message, f"✅ 已开放注册,名额:{amount}"
+ )
except Exception as e:
await self._send_error(message, e, prefix="开放注册失败")
@@ -429,59 +494,87 @@ async def help_command(self, message: Message):
# =============== 命令挂载 ===============
def setup_commands(self):
- @self.bot_client.client.on_message(filters.private & filters.command(["help", "start"]))
+ @self.bot_client.client.on_message(
+ filters.private & filters.command(["help", "start"])
+ )
async def c_help(client, message):
await self.help_command(message)
- @self.bot_client.client.on_message(filters.command("count") & user_in_group_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("count") & user_in_group_on_filter
+ )
async def c_count(client, message):
await self.count(message)
- @self.bot_client.client.on_message(filters.command("info") & user_in_group_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("info") & user_in_group_on_filter
+ )
async def c_info(client, message):
await self.info(message)
- @self.bot_client.client.on_message(filters.private & filters.command("use_code") & user_in_group_on_filter)
+ @self.bot_client.client.on_message(
+ filters.private & filters.command("use_code") & user_in_group_on_filter
+ )
async def c_use_code(client, message):
await self.use_code(message)
- @self.bot_client.client.on_message(filters.private & filters.command("create") & user_in_group_on_filter)
+ @self.bot_client.client.on_message(
+ filters.private & filters.command("create") & user_in_group_on_filter
+ )
async def c_create_user(client, message):
await self.create_user(message)
@self.bot_client.client.on_message(
- filters.private & filters.command("reset_emby_password") & user_in_group_on_filter & emby_user_on_filter
+ filters.private
+ & filters.command("reset_emby_password")
+ & user_in_group_on_filter
+ & emby_user_on_filter
)
async def c_reset_emby_password(client, message):
await self.reset_emby_password(message)
@self.bot_client.client.on_message(
- filters.private & filters.command("select_line") & user_in_group_on_filter & emby_user_on_filter
+ filters.private
+ & filters.command("select_line")
+ & user_in_group_on_filter
+ & emby_user_on_filter
)
async def c_select_line(client, message):
await self.select_line(message)
- @self.bot_client.client.on_message(filters.command("new_code") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("new_code") & admin_user_on_filter
+ )
async def c_new_code(client, message):
await self.new_code(message)
- @self.bot_client.client.on_message(filters.command("new_whitelist_code") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("new_whitelist_code") & admin_user_on_filter
+ )
async def c_new_whitelist_code(client, message):
await self.new_whitelist_code(message)
- @self.bot_client.client.on_message(filters.command("ban_emby") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("ban_emby") & admin_user_on_filter
+ )
async def c_ban_emby(client, message):
await self.ban_emby(message)
- @self.bot_client.client.on_message(filters.command("unban_emby") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("unban_emby") & admin_user_on_filter
+ )
async def c_unban_emby(client, message):
await self.unban_emby(message)
- @self.bot_client.client.on_message(filters.command("register_until") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("register_until") & admin_user_on_filter
+ )
async def c_register_until(client, message):
await self.register_until(message)
- @self.bot_client.client.on_message(filters.command("register_amount") & admin_user_on_filter)
+ @self.bot_client.client.on_message(
+ filters.command("register_amount") & admin_user_on_filter
+ )
async def c_register_amount(client, message):
await self.register_amount(message)
@@ -489,6 +582,8 @@ async def c_register_amount(client, message):
async def c_select_line_cb(client, callback_query):
await self.handle_callback_query(client, callback_query)
- @self.bot_client.client.on_message(filters.left_chat_member | filters.new_chat_members)
+ @self.bot_client.client.on_message(
+ filters.left_chat_member | filters.new_chat_members
+ )
async def group_member_change_handler(client, message):
- await self.group_member_change_handler(client, message)
\ No newline at end of file
+ await self.group_member_change_handler(client, message)
diff --git a/bot/filters.py b/bot/filters.py
index a9319fb..5284478 100644
--- a/bot/filters.py
+++ b/bot/filters.py
@@ -14,7 +14,7 @@ async def user_in_group_on_filter(filter, client, update) -> bool:
if config.group_members and telegram_id in config.group_members:
logger.debug(f"User {telegram_id} is in group")
return True
- if config.channel_members and telegram_id in config.channel_members:
+ if config.channel_members and telegram_id in config.channel_members:
logger.debug(f"User {telegram_id} is in channel")
return True
@@ -31,7 +31,9 @@ async def admin_user_on_filter(filter, client, update) -> bool:
logger.debug(f"User {telegram_id} is an admin")
return True
except Exception as e:
- logger.error(f"Error checking admin status for user {telegram_id}: {e}", exc_info=True)
+ logger.error(
+ f"Error checking admin status for user {telegram_id}: {e}", exc_info=True
+ )
return False
logger.debug(f"User {telegram_id} is not an admin")
@@ -47,13 +49,15 @@ async def emby_user_on_filter(filter, client, update) -> bool:
logger.debug(f"User {telegram_id} is an Emby user")
return True
except Exception as e:
- logger.error(f"Error checking Emby status for user {telegram_id}: {e}", exc_info=True)
+ logger.error(
+ f"Error checking Emby status for user {telegram_id}: {e}", exc_info=True
+ )
return False
logger.debug(f"User {telegram_id} is not an Emby user")
return False
-user_in_group_on_filter = create(user_in_group_on_filter, 'user_in_group_on_filter')
-admin_user_on_filter = create(admin_user_on_filter, 'admin_user_on_filter')
-emby_user_on_filter = create(emby_user_on_filter, 'emby_user_on_filter')
\ No newline at end of file
+user_in_group_on_filter = create(user_in_group_on_filter, "user_in_group_on_filter")
+admin_user_on_filter = create(admin_user_on_filter, "admin_user_on_filter")
+emby_user_on_filter = create(emby_user_on_filter, "emby_user_on_filter")
diff --git a/bot/message_helper.py b/bot/message_helper.py
index 54f5c57..4efa283 100644
--- a/bot/message_helper.py
+++ b/bot/message_helper.py
@@ -36,7 +36,9 @@ async def get_user_telegram_id(client, message):
try:
user = await client.get_users(telegram_username)
telegram_id = user.id
- logger.debug(f"Telegram ID resolved from username {telegram_username}: {telegram_id}")
+ logger.debug(
+ f"Telegram ID resolved from username {telegram_username}: {telegram_id}"
+ )
except UsernameNotOccupied:
error_message = f"❌ 用户名 @{telegram_username} 不存在"
logger.warning(f"Username not occupied: {telegram_username}")
@@ -48,7 +50,10 @@ async def get_user_telegram_id(client, message):
await message.reply(error_message)
return None
except Exception as e:
- logger.error(f"Error getting user ID from username {telegram_username}: {e}", exc_info=True)
+ logger.error(
+ f"Error getting user ID from username {telegram_username}: {e}",
+ exc_info=True,
+ )
return None
- return telegram_id
\ No newline at end of file
+ return telegram_id
diff --git a/bot/utils.py b/bot/utils.py
index f5698da..98abc64 100644
--- a/bot/utils.py
+++ b/bot/utils.py
@@ -7,11 +7,15 @@
def parse_iso8601(datetime_str: str):
# 解析字符串为 datetime 对象
try:
- dt = datetime.strptime(datetime_str[:26], "%Y-%m-%dT%H:%M:%S.%f") # 截取到微秒部分
+ dt = datetime.strptime(
+ datetime_str[:26], "%Y-%m-%dT%H:%M:%S.%f"
+ ) # 截取到微秒部分
logger.debug(f"Parsed ISO8601 datetime string: {datetime_str}")
return dt
except Exception as e:
- logger.error(f"Error parsing ISO8601 datetime string: {datetime_str}: {e}", exc_info=True)
+ logger.error(
+ f"Error parsing ISO8601 datetime string: {datetime_str}: {e}", exc_info=True
+ )
return None
@@ -36,4 +40,4 @@ def parse_timestamp_to_normal_date(timestamp: int):
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
logger.error(f"Error parsing timestamp {timestamp}: {e}", exc_info=True)
- return None
\ No newline at end of file
+ return None
diff --git a/config.py b/config.py
index 42f623c..3290b57 100644
--- a/config.py
+++ b/config.py
@@ -7,6 +7,7 @@
logger = logging.getLogger(__name__)
+
class Config:
def __init__(self):
self.timezone = os.getenv("TIMEZONE")
@@ -14,7 +15,9 @@ def __init__(self):
self.bot_token = os.getenv("BOT_TOKEN")
self.api_id = os.getenv("API_ID")
self.api_hash = os.getenv("API_HASH")
- self.telegram_group_ids = list(map(int, os.getenv("TELEGRAM_GROUP_ID").split(",")))
+ self.telegram_group_ids = list(
+ map(int, os.getenv("TELEGRAM_GROUP_ID").split(","))
+ )
self.emby_url = os.getenv("EMBY_URL")
self.emby_api = os.getenv("EMBY_API_KEY")
self.api_url = os.getenv("API_URL")
@@ -34,4 +37,4 @@ def __init__(self):
# 实例化并提供配置对象
config = Config()
-logger.debug(f"Admin list: {config.admin_list}")
\ No newline at end of file
+logger.debug(f"Admin list: {config.admin_list}")
diff --git a/core/emby_api.py b/core/emby_api.py
index 2a64611..b15ff0c 100644
--- a/core/emby_api.py
+++ b/core/emby_api.py
@@ -15,10 +15,12 @@ def __init__(self, emby_url: str, emby_api: str, timeout: int = 10):
:param emby_api: Emby 服务器的 API Key
:param timeout: 每次请求的超时时间,默认为 10 秒
"""
- self.base_url: str = emby_url.rstrip('/')
+ self.base_url: str = emby_url.rstrip("/")
self.api_key: str = emby_api
self.timeout: int = timeout
- logger.info(f"EmbyApi initialized with URL: {self.base_url}, timeout: {self.timeout}")
+ logger.info(
+ f"EmbyApi initialized with URL: {self.base_url}, timeout: {self.timeout}"
+ )
def _request(self, method: str, path: str, data=None, params=None):
"""
@@ -35,16 +37,24 @@ def _request(self, method: str, path: str, data=None, params=None):
"Authorization": f"Token={self.api_key}",
"X-Emby-Authorization": f"Token={self.api_key}",
"User-Agent": "sadasd",
- "Accept-Language": "zh-CN,zh-Hans;q=0.9", "Content-Type": "application/json", "Accept": "*/*"
+ "Accept-Language": "zh-CN,zh-Hans;q=0.9",
+ "Content-Type": "application/json",
+ "Accept": "*/*",
}
url = f"{self.base_url}{path}"
- logger.debug(f"Making {method} request to {url} with params: {params}, data: {data}")
+ logger.debug(
+ f"Making {method} request to {url} with params: {params}, data: {data}"
+ )
try:
- if method.upper() == 'GET':
- response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
- elif method.upper() == 'POST':
- response = requests.post(url, params=params, json=data, timeout=self.timeout, headers=headers)
+ if method.upper() == "GET":
+ response = requests.get(
+ url, params=params, timeout=self.timeout, headers=headers
+ )
+ elif method.upper() == "POST":
+ response = requests.post(
+ url, params=params, json=data, timeout=self.timeout, headers=headers
+ )
else:
raise Exception(f"暂不支持的 HTTP 方法: {method}")
@@ -58,7 +68,9 @@ def _request(self, method: str, path: str, data=None, params=None):
raise Exception(f"无法连接到 Emby 服务器: {str(e)}")
except requests.exceptions.RequestException as e:
# 其他 requests 异常
- logger.error(f"An unknown error occurred while requesting Emby: {e}", exc_info=True)
+ logger.error(
+ f"An unknown error occurred while requesting Emby: {e}", exc_info=True
+ )
raise Exception(f"请求 Emby 时发生未知错误: {str(e)}")
try:
@@ -78,9 +90,11 @@ def get_user(self, emby_id: str):
path = f"/emby/Users/{emby_id}"
logger.info(f"Getting user with Emby ID: {emby_id}")
try:
- return self._request('GET', path)
+ return self._request("GET", path)
except Exception as e:
- logger.error(f"Failed to get user with Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to get user with Emby ID {emby_id}: {e}", exc_info=True
+ )
raise
def create_user(self, name: str):
@@ -93,7 +107,7 @@ def create_user(self, name: str):
data = {"Name": name, "HasPassword": False}
logger.info(f"Creating user with name: {name}")
try:
- return self._request('POST', path, data=data)
+ return self._request("POST", path, data=data)
except Exception as e:
logger.error(f"Failed to create user with name {name}: {e}", exc_info=True)
raise
@@ -126,13 +140,15 @@ def ban_user(self, emby_id: str):
"EnableMediaConversion": False,
"EnableAllDevices": True,
"AllowCameraUpload": False,
- "SimultaneousStreamLimit": 0
+ "SimultaneousStreamLimit": 0,
}
logger.info(f"Banning user with Emby ID: {emby_id}")
try:
return self.update_user_policy(emby_id, data)
except Exception as e:
- logger.error(f"Failed to ban user with Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to ban user with Emby ID {emby_id}: {e}", exc_info=True
+ )
raise
def set_default_policy(self, emby_id: str):
@@ -163,13 +179,16 @@ def set_default_policy(self, emby_id: str):
"EnableMediaConversion": False,
"EnableAllDevices": True,
"AllowCameraUpload": False,
- "SimultaneousStreamLimit": 3
+ "SimultaneousStreamLimit": 3,
}
logger.info(f"Setting default policy for user with Emby ID: {emby_id}")
try:
return self.update_user_policy(emby_id, data)
except Exception as e:
- logger.error(f"Failed to set default policy for user with Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to set default policy for user with Emby ID {emby_id}: {e}",
+ exc_info=True,
+ )
raise
def update_user_policy(self, emby_id: str, policy_data: dict):
@@ -180,11 +199,16 @@ def update_user_policy(self, emby_id: str, policy_data: dict):
:return: 成功返回更新后的用户信息 JSON,失败抛出异常
"""
path = f"/emby/Users/{emby_id}/Policy"
- logger.info(f"Updating user policy for Emby ID: {emby_id} with data: {policy_data}")
+ logger.info(
+ f"Updating user policy for Emby ID: {emby_id} with data: {policy_data}"
+ )
try:
- return self._request('POST', path, data=policy_data)
+ return self._request("POST", path, data=policy_data)
except Exception as e:
- logger.error(f"Failed to update user policy for Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to update user policy for Emby ID {emby_id}: {e}",
+ exc_info=True,
+ )
raise
def reset_user_password(self, emby_id: str):
@@ -197,9 +221,12 @@ def reset_user_password(self, emby_id: str):
data = {"ResetPassword": True}
logger.info(f"Resetting password for user with Emby ID: {emby_id}")
try:
- return self._request('POST', path, data=data)
+ return self._request("POST", path, data=data)
except Exception as e:
- logger.error(f"Failed to reset password for user with Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to reset password for user with Emby ID {emby_id}: {e}",
+ exc_info=True,
+ )
raise
def set_user_password(self, emby_id: str, new_pass: str):
@@ -213,9 +240,12 @@ def set_user_password(self, emby_id: str, new_pass: str):
data = {"ResetPassword": False, "CurrentPw": "", "NewPw": new_pass}
logger.info(f"Setting password for user with Emby ID: {emby_id}")
try:
- return self._request('POST', path, data=data)
+ return self._request("POST", path, data=data)
except Exception as e:
- logger.error(f"Failed to set password for user with Emby ID {emby_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to set password for user with Emby ID {emby_id}: {e}",
+ exc_info=True,
+ )
raise
def check_emby_site(self) -> bool:
@@ -226,7 +256,7 @@ def check_emby_site(self) -> bool:
path = "/emby/System/Info"
logger.info("Checking Emby site availability")
try:
- self._request('GET', path)
+ self._request("GET", path)
return True
except Exception as e:
logger.warning(f"Emby site check failed: {e}", exc_info=True)
@@ -242,7 +272,7 @@ def count(self):
path = "/emby/Items/Counts"
logger.info("Getting Emby item counts")
try:
- return self._request('GET', path)
+ return self._request("GET", path)
except Exception as e:
logger.error(f"Failed to get Emby item counts: {e}", exc_info=True)
raise
@@ -259,10 +289,12 @@ def __init__(self, api_url: str, api_key: str = "", timeout: int = 10):
:param api_key: 路由服务使用的Token(如果需要鉴权)
:param timeout: 请求超时,默认为10秒
"""
- self.api_url = api_url.rstrip('/')
+ self.api_url = api_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
- logger.info(f"EmbyRouterAPI initialized with URL: {self.api_url}, timeout: {self.timeout}")
+ logger.info(
+ f"EmbyRouterAPI initialized with URL: {self.api_url}, timeout: {self.timeout}"
+ )
def call_api(self, path: str):
"""
@@ -271,7 +303,7 @@ def call_api(self, path: str):
:return: 成功时返回 JSON,失败抛出异常
"""
url = f"{self.api_url}{path}"
- headers = {'Authorization': f'Bearer {self.api_key}'} if self.api_key else {}
+ headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
logger.debug(f"Calling API at {url}")
try:
response = requests.get(url, headers=headers, timeout=self.timeout)
@@ -284,7 +316,10 @@ def call_api(self, path: str):
logger.error(f"Failed to connect to router service: {e}", exc_info=True)
raise Exception(f"无法连接到路由服务: {str(e)}")
except requests.exceptions.RequestException as e:
- logger.error(f"An unknown error occurred while requesting router service: {e}", exc_info=True)
+ logger.error(
+ f"An unknown error occurred while requesting router service: {e}",
+ exc_info=True,
+ )
raise Exception(f"请求路由服务时发生错误: {str(e)}")
def query_all_route(self):
@@ -293,7 +328,7 @@ def query_all_route(self):
"""
logger.info("Querying all routes")
try:
- return self.call_api('/api/route')
+ return self.call_api("/api/route")
except Exception as e:
logger.error(f"Failed to query all routes: {e}", exc_info=True)
raise
@@ -304,9 +339,11 @@ def query_user_route(self, user_id: str):
"""
logger.info(f"Querying user route for user ID: {user_id}")
try:
- return self.call_api(f'/api/route/{user_id}')
+ return self.call_api(f"/api/route/{user_id}")
except Exception as e:
- logger.error(f"Failed to query user route for user ID {user_id}: {e}", exc_info=True)
+ logger.error(
+ f"Failed to query user route for user ID {user_id}: {e}", exc_info=True
+ )
raise
def update_user_route(self, user_id: str, new_index: str):
@@ -315,7 +352,10 @@ def update_user_route(self, user_id: str, new_index: str):
"""
logger.info(f"Updating user route for user ID: {user_id} to index: {new_index}")
try:
- return self.call_api(f'/api/route/{user_id}/{new_index}')
+ return self.call_api(f"/api/route/{user_id}/{new_index}")
except Exception as e:
- logger.error(f"Failed to update user route for user ID {user_id} to index {new_index}: {e}", exc_info=True)
- raise
\ No newline at end of file
+ logger.error(
+ f"Failed to update user route for user ID {user_id} to index {new_index}: {e}",
+ exc_info=True,
+ )
+ raise
diff --git a/models/__init__.py b/models/__init__.py
index 09b0f34..d128db6 100644
--- a/models/__init__.py
+++ b/models/__init__.py
@@ -1,4 +1,3 @@
from .user_model import User
from .config_model import Config
from .invite_code_model import InviteCode
-
diff --git a/models/config_model.py b/models/config_model.py
index 7b89ad1..0b269a2 100644
--- a/models/config_model.py
+++ b/models/config_model.py
@@ -6,15 +6,19 @@
logger = logging.getLogger(__name__)
+
class Config(BaseOrmTableWithTS):
- __tablename__ = 'config'
+ __tablename__ = "config"
total_register_user: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
- register_public_user: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+ register_public_user: Mapped[int] = mapped_column(
+ Integer, nullable=False, default=0
+ )
register_public_time: Mapped[int] = mapped_column(BigInteger, nullable=True)
class ConfigOrm(DBManager):
orm_table = Config
-logger.info("Config model initialized")
\ No newline at end of file
+
+logger.info("Config model initialized")
diff --git a/models/invite_code_model.py b/models/invite_code_model.py
index 4860201..3705697 100644
--- a/models/invite_code_model.py
+++ b/models/invite_code_model.py
@@ -7,31 +7,41 @@
logger = logging.getLogger(__name__)
+
class InviteCodeType(enum.Enum):
- REGISTER = 'register' # 注册邀请码
- WHITELIST = 'whitelist' # 白名单邀请码
+ REGISTER = "register" # 注册邀请码
+ WHITELIST = "whitelist" # 白名单邀请码
def __str__(self):
return self.value
class InviteCode(BaseOrmTableWithTS):
- __tablename__ = 'invite_code'
+ __tablename__ = "invite_code"
- code: Mapped[str] = mapped_column(String(50), index=True, unique=True, nullable=False)
+ code: Mapped[str] = mapped_column(
+ String(50), index=True, unique=True, nullable=False
+ )
telegram_id: Mapped[int] = mapped_column(BigInteger, index=True, nullable=False)
- code_type: Mapped[InviteCodeType] = mapped_column(Enum(InviteCodeType), nullable=False)
+ code_type: Mapped[InviteCodeType] = mapped_column(
+ Enum(InviteCodeType), nullable=False
+ )
is_used: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
used_time: Mapped[int] = mapped_column(BigInteger, default=None, nullable=True)
- used_user_id: Mapped[int] = mapped_column(BigInteger, default=None, nullable=True, index=True)
+ used_user_id: Mapped[int] = mapped_column(
+ BigInteger, default=None, nullable=True, index=True
+ )
def __repr__(self):
- return (f"")
+ return (
+ f""
+ )
class InviteCodeOrm(DBManager):
orm_table = InviteCode
-logger.info("InviteCode model initialized")
\ No newline at end of file
+
+logger.info("InviteCode model initialized")
diff --git a/models/user_model.py b/models/user_model.py
index 21cda65..8d4069a 100644
--- a/models/user_model.py
+++ b/models/user_model.py
@@ -8,16 +8,23 @@
logger = logging.getLogger(__name__)
+
class User(BaseOrmTableWithTS):
- __tablename__ = 'user'
+ __tablename__ = "user"
- telegram_id: Mapped[int] = mapped_column(BigInteger, index=True, unique=True, nullable=False)
+ telegram_id: Mapped[int] = mapped_column(
+ BigInteger, index=True, unique=True, nullable=False
+ )
telegram_name: Mapped[str] = mapped_column(String(100), nullable=True)
emby_name: Mapped[str] = mapped_column(String(50), nullable=True)
- emby_id: Mapped[str] = mapped_column(String(50), index=True, unique=True, nullable=True)
+ emby_id: Mapped[str] = mapped_column(
+ String(50), index=True, unique=True, nullable=True
+ )
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
is_whitelist: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
- enable_register: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
+ enable_register: Mapped[bool] = mapped_column(
+ Boolean, default=False, nullable=False
+ )
ban_time: Mapped[int] = mapped_column(nullable=True)
reason: Mapped[str] = mapped_column(String(100), nullable=True)
@@ -104,4 +111,5 @@ def emby_ban_info(self) -> tuple[int, str]:
class UserOrm(DBManager):
orm_table = User
-logger.info("User model initialized")
\ No newline at end of file
+
+logger.info("User model initialized")
diff --git a/services/user_service.py b/services/user_service.py
index 40e073f..c460761 100644
--- a/services/user_service.py
+++ b/services/user_service.py
@@ -33,7 +33,9 @@ async def get_or_create_user_by_telegram_id(telegram_id: int) -> User:
default_user = User(
telegram_id=telegram_id,
is_admin=telegram_id in config.admin_list,
- telegram_name=config.group_members.get(telegram_id, {}).get('username'),
+ telegram_name=config.group_members.get(telegram_id, {}).username
+ if config.group_members.get(telegram_id)
+ else None,
)
user_id = await UserOrm().add(default_user)
user = default_user
@@ -62,7 +64,9 @@ async def must_get_emby_user(self, telegram_id: int) -> User:
raise Exception("该用户的 Emby 账号已被禁用,无法执行此操作。")
return user
- async def _emby_create_user(self, telegram_id: int, username: str, password: str) -> User:
+ async def _emby_create_user(
+ self, telegram_id: int, username: str, password: str
+ ) -> User:
"""内部使用:真正调用 Emby API 创建用户,并设置初始密码"""
user = await self.get_or_create_user_by_telegram_id(telegram_id)
emby_user = self.emby_api.create_user(username)
@@ -82,38 +86,46 @@ async def _emby_create_user(self, telegram_id: int, username: str, password: str
@staticmethod
def gen_default_passwd() -> str:
"""生成默认密码:随机6位的字母数字组合"""
- return ''.join(sample(string.ascii_letters + string.digits, 6))
+ return "".join(sample(string.ascii_letters + string.digits, 6))
@staticmethod
def gen_register_code(num: int) -> List[str]:
"""批量生成普通邀请码"""
- return [f'epr-{str(shortuuid.uuid())}' for _ in range(num)]
+ return [f"epr-{str(shortuuid.uuid())}" for _ in range(num)]
@staticmethod
def gen_whitelist_code(num: int) -> List[str]:
"""批量生成白名单邀请码"""
- return [f'epw-{str(shortuuid.uuid())}' for _ in range(num)]
+ return [f"epw-{str(shortuuid.uuid())}" for _ in range(num)]
- async def create_invite_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]:
+ async def create_invite_code(
+ self, telegram_id: int, count: int = 1
+ ) -> List[InviteCode]:
"""创建普通邀请码,需检测用户是否有权限"""
user = await self.must_get_user(telegram_id)
if not user.check_create_invite_code():
raise Exception("您没有权限生成普通邀请码。")
code_objs = [
- InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER)
+ InviteCode(
+ code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER
+ )
for code in self.gen_register_code(count)
]
return await InviteCodeOrm().bulk_add(code_objs)
- async def create_whitelist_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]:
+ async def create_whitelist_code(
+ self, telegram_id: int, count: int = 1
+ ) -> List[InviteCode]:
"""创建白名单邀请码,需检测用户是否有权限"""
user = await self.must_get_user(telegram_id)
if not user.check_create_whitelist_code():
raise Exception("您没有权限生成白名单邀请码。")
code_objs = [
- InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST)
+ InviteCode(
+ code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST
+ )
for code in self.gen_whitelist_code(count)
]
return await InviteCodeOrm().bulk_add(code_objs)
@@ -125,7 +137,9 @@ async def emby_info(self, telegram_id: int) -> Tuple[User, Dict]:
raise Exception("该用户尚未绑定 Emby 账号。")
emby_user = self.emby_api.get_user(str(user.emby_id))
if not emby_user:
- raise Exception("从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。")
+ raise Exception(
+ "从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。"
+ )
return user, emby_user
async def first_or_create_emby_config(self) -> Config:
@@ -133,14 +147,14 @@ async def first_or_create_emby_config(self) -> Config:
emby_config = await ConfigOrm().query_one(conds=[Config.id == 1])
if not emby_config:
emby_config = Config(
- register_public_user=0,
- register_public_time=0,
- total_register_user=0
+ register_public_user=0, register_public_time=0, total_register_user=0
)
await ConfigOrm().add(emby_config)
return emby_config
- async def emby_create_user(self, telegram_id: int, username: str, password: str) -> User:
+ async def emby_create_user(
+ self, telegram_id: int, username: str, password: str
+ ) -> User:
"""创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user"""
user = await self.get_or_create_user_by_telegram_id(telegram_id)
if user.has_emby_account():
@@ -178,14 +192,13 @@ async def _check_register_permission(self, user: User, emby_config: Config) -> b
enable_register = True
if 0 < emby_config.register_public_time < datetime.now().timestamp():
await ConfigOrm().update(
- values={'register_public_time': 0},
- conds=[Config.id == 1]
+ values={"register_public_time": 0}, conds=[Config.id == 1]
)
return enable_register
async def redeem_code(self, telegram_id: int, code: str):
"""使用邀请码,分为普通注册邀请码和白名单邀请码"""
- pattern = re.compile(r'^(epr|epw)-[A-Za-z0-9]+$')
+ pattern = re.compile(r"^(epr|epw)-[A-Za-z0-9]+$")
if not pattern.match(code):
raise Exception("邀请码格式不正确。")
@@ -226,7 +239,7 @@ async def redeem_code(self, telegram_id: int, code: str):
return valid_code
- async def reset_password(self, telegram_id: int, password: str = '') -> bool:
+ async def reset_password(self, telegram_id: int, password: str = "") -> bool:
"""重置用户的 Emby 密码。"""
user = await self.must_get_emby_user(telegram_id)
try:
@@ -237,7 +250,9 @@ async def reset_password(self, telegram_id: int, password: str = '') -> bool:
logger.error(f"重置密码失败: {e}")
return False
- async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None) -> bool:
+ async def emby_ban(
+ self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None
+ ) -> bool:
"""禁用用户"""
if operator_telegram_id is not None:
admin_user = await self.must_get_user(operator_telegram_id)
@@ -253,14 +268,16 @@ async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Op
user.reason = reason
await UserOrm().update(
{"ban_time": user.ban_time, "reason": reason},
- conds=[User.id == user.id]
+ conds=[User.id == user.id],
)
return True
except Exception as e:
logger.error(f"禁用用户失败: {e}")
return False
- async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] = None) -> bool:
+ async def emby_unban(
+ self, telegram_id: int, operator_telegram_id: Optional[int] = None
+ ) -> bool:
"""解禁用户"""
if operator_telegram_id is not None:
admin_user = await self.must_get_user(operator_telegram_id)
@@ -275,16 +292,19 @@ async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int]
user.ban_time = 0
user.reason = ""
await UserOrm().update(
- {"ban_time": 0, "reason": None},
- conds=[User.id == user.id]
+ {"ban_time": 0, "reason": None}, conds=[User.id == user.id]
)
return True
except Exception as e:
logger.error(f"解禁用户失败: {e}")
return False
- async def set_emby_config(self, telegram_id: int, register_public_user: Optional[int] = None,
- register_public_time: Optional[int] = None) -> Config:
+ async def set_emby_config(
+ self,
+ telegram_id: int,
+ register_public_user: Optional[int] = None,
+ register_public_time: Optional[int] = None,
+ ) -> Config:
"""设置 Emby 注册相关配置,如公共注册名额和公共注册截止时间"""
user = await self.must_get_user(telegram_id)
user.check_set_emby_config()
@@ -300,10 +320,10 @@ async def set_emby_config(self, telegram_id: int, register_public_user: Optional
await ConfigOrm().update(
values={
- 'register_public_user': emby_config.register_public_user,
- 'register_public_time': emby_config.register_public_time
+ "register_public_user": emby_config.register_public_user,
+ "register_public_time": emby_config.register_public_time,
},
- conds=[Config.id == 1]
+ conds=[Config.id == 1],
)
return emby_config
@@ -324,4 +344,4 @@ async def update_user_router(self, telegram_id: int, new_index: str) -> bool:
async def get_router_list(self, telegram_id: int) -> List[Dict]:
"""获取所有可用线路"""
await self.must_get_emby_user(telegram_id)
- return self.emby_router_api.query_all_route()
\ No newline at end of file
+ return self.emby_router_api.query_all_route()