From 00ce1b91fc3307459340fe580e2df06b64735d43 Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:38:48 +0800 Subject: [PATCH 1/3] Refactor(commands): refactor argument validation to decorator with formatting --- bot/commands.py | 265 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 180 insertions(+), 85 deletions(-) diff --git a/bot/commands.py b/bot/commands.py index 2a3d801..c861633 100644 --- a/bot/commands.py +++ b/bot/commands.py @@ -1,12 +1,22 @@ import logging +import functools from datetime import datetime from pyrogram import filters from pyrogram.enums import ParseMode -from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery, Message +from pyrogram.types import ( + InlineKeyboardButton, + InlineKeyboardMarkup, + CallbackQuery, + Message, +) from bot.bot_client import BotClient -from bot.filters import user_in_group_on_filter, admin_user_on_filter, emby_user_on_filter +from bot.filters import ( + user_in_group_on_filter, + admin_user_on_filter, + emby_user_on_filter, +) from bot.message_helper import get_user_telegram_id from bot.utils import parse_iso8601_to_normal_date from config import config @@ -41,16 +51,32 @@ def _parse_args(message: Message) -> list[str]: parts = message.text.strip().split(" ") return parts[1:] if len(parts) > 1 else [] - async def _ensure_args(self, message: Message, args: list, min_len: int, usage: str): + @staticmethod + def ensure_args(min_len: int, usage: str): """ - 确保命令行参数长度足够,不足则回复用法说明。 + 装饰器:确保命令行参数长度足够,不足则回复用法说明。 """ - if len(args) < min_len: - await self._reply_html(message, f"参数不足,请参考用法:\n{usage}") - return False - return True - async def _send_error(self, message: Message, error: Exception, prefix: str = "操作失败"): + def decorator(func): + @functools.wraps(func) + async def wrapper(self, message, *args, **kwargs): + # 从消息中解析参数 + parsed_args = self._parse_args(message) + if len(parsed_args) < min_len: + await self._reply_html( + message, f"参数不足,请参考用法:\n{usage}" + ) + return + # 将解析好的参数传递给目标函数,避免在函数内部再调用 _parse_args + return await func(self, message, parsed_args, *args, **kwargs) + + return wrapper + + return decorator + + async def _send_error( + self, message: Message, error: Exception, prefix: str = "操作失败" + ): """ 统一的异常捕获后回复方式。 """ @@ -59,22 +85,22 @@ async def _send_error(self, message: Message, error: Exception, prefix: str = " # =============== 各类命令逻辑 =============== - async def create_user(self, message: Message): + @ensure_args(1, "/create <用户名>") + async def create_user(self, message: Message, args: list[str]): """ /create <用户名> """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/create <用户名>"): - return emby_name = args[0] try: default_password = self.user_service.gen_default_passwd() - user = await self.user_service.emby_create_user(message.from_user.id, emby_name, default_password) + user = await self.user_service.emby_create_user( + message.from_user.id, emby_name, default_password + ) if user and user.has_emby_account(): await self._reply_html( message, - f"✅ 创建用户成功。\n初始密码:{default_password}" + f"✅ 创建用户成功。\n初始密码:{default_password}", ) else: await self._reply_html(message, "❌ 创建用户失败,请稍后重试。") @@ -89,10 +115,17 @@ async def info(self, message: Message): telegram_id = await get_user_telegram_id(self.bot_client.client, message) try: user, emby_info = await self.user_service.emby_info(telegram_id) - last_active = (parse_iso8601_to_normal_date(emby_info.get("LastActivityDate")) - if emby_info.get("LastActivityDate") else "无") - date_created = parse_iso8601_to_normal_date(emby_info.get("DateCreated", "")) - ban_status = "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用" + last_active = ( + parse_iso8601_to_normal_date(emby_info.get("LastActivityDate")) + if emby_info.get("LastActivityDate") + else "无" + ) + date_created = parse_iso8601_to_normal_date( + emby_info.get("DateCreated", "") + ) + ban_status = ( + "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用" + ) reply_text = ( f"👤 用户信息:\n" @@ -105,7 +138,9 @@ async def info(self, message: Message): ) if user.ban_time and user.ban_time > 0: - ban_time = datetime.fromtimestamp(user.ban_time).strftime('%Y-%m-%d %H:%M:%S') + ban_time = datetime.fromtimestamp(user.ban_time).strftime( + "%Y-%m-%d %H:%M:%S" + ) reply_text += f"• 被ban时间:{ban_time}\n" if user.reason: reply_text += f"• 被ban原因:{user.reason}\n" @@ -114,13 +149,11 @@ async def info(self, message: Message): except Exception as e: await self._send_error(message, e, prefix="查询失败") - async def use_code(self, message: Message): + @ensure_args(1, "/use_code <邀请码>") + async def use_code(self, message: Message, args: list[str]): """ /use_code <邀请码> """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/use_code <邀请码>"): - return code = args[0] telegram_id = message.from_user.id @@ -130,14 +163,18 @@ async def use_code(self, message: Message): return await self._reply_html(message, "❌ 邀请码使用失败") # 根据类型给出不同的回复 if used_code.code_type == InviteCodeType.REGISTER: - await self._reply_html(message, "✅ 邀请码使用成功,您已获得创建账号资格") + await self._reply_html( + message, "✅ 邀请码使用成功,您已获得创建账号资格" + ) else: await self._reply_html(message, "✅ 邀请码使用成功,您已获得白名单资格") # 如果该邀请码在bot中记录了消息,需要删除 if self.code_to_message_id.get(code): code_to_message_id = self.code_to_message_id[code] - await self.bot_client.client.delete_messages(code_to_message_id[0], code_to_message_id[1]) + await self.bot_client.client.delete_messages( + code_to_message_id[0], code_to_message_id[1] + ) del self.code_to_message_id[code] except Exception as e: await self._send_error(message, e, prefix="邀请码使用失败") @@ -148,10 +185,12 @@ async def reset_emby_password(self, message: Message): """ default_password = self.user_service.gen_default_passwd() try: - if await self.user_service.reset_password(message.from_user.id, default_password): + if await self.user_service.reset_password( + message.from_user.id, default_password + ): await self._reply_html( message, - f"✅ 密码重置成功。\n新密码:{default_password}" + f"✅ 密码重置成功。\n新密码:{default_password}", ) else: await self._reply_html(message, "❌ 密码重置失败,请稍后重试。") @@ -168,11 +207,15 @@ async def new_code(self, message: Message): try: num = int(args[0]) except ValueError: - return await self._reply_html(message, "❌ 请输入有效数量 /new_code [整数]") + return await self._reply_html( + message, "❌ 请输入有效数量 /new_code [整数]" + ) num = min(num, 20) try: - code_list = await self.user_service.create_invite_code(message.from_user.id, num) + code_list = await self.user_service.create_invite_code( + message.from_user.id, num + ) for code_obj in code_list: message_text = f"📌 邀请码:\n点击复制👉{code_obj.code}" if message.reply_to_message is not None: @@ -188,10 +231,7 @@ async def new_code(self, message: Message): ) await self._reply_html(message, "✅ 已发送邀请码") else: - msg = await self._reply_html( - message, - message_text - ) + msg = await self._reply_html(message, message_text) self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) except Exception as e: await self._send_error(message, e, prefix="创建邀请码失败") @@ -206,13 +246,19 @@ async def new_whitelist_code(self, message: Message): try: num = int(args[0]) except ValueError: - return await self._reply_html(message, "❌ 请输入有效数量 /new_whitelist_code [整数]") + return await self._reply_html( + message, "❌ 请输入有效数量 /new_whitelist_code [整数]" + ) num = min(num, 20) try: - code_list = await self.user_service.create_whitelist_code(message.from_user.id, num) + code_list = await self.user_service.create_whitelist_code( + message.from_user.id, num + ) for code_obj in code_list: - message_text = f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" + message_text = ( + f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" + ) if message.reply_to_message is not None: await self.bot_client.client.send_message( chat_id=message.from_user.id, @@ -226,10 +272,7 @@ async def new_whitelist_code(self, message: Message): ) await self._reply_html(message, "✅ 已发送邀请码") else: - msg = await self._reply_html( - message, - message_text - ) + msg = await self._reply_html(message, message_text) self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) except Exception as e: await self._send_error(message, e, prefix="创建白名单邀请码失败") @@ -246,8 +289,7 @@ async def ban_emby(self, message: Message): try: if await self.user_service.emby_ban(telegram_id, reason, operator_id): await self._reply_html( - message, - f"✅ 已禁用用户 {telegram_id} 的Emby账号" + message, f"✅ 已禁用用户 {telegram_id} 的Emby账号" ) else: await self._reply_html(message, "❌ 禁用失败,请稍后重试。") @@ -263,8 +305,7 @@ async def unban_emby(self, message: Message): try: if await self.user_service.emby_unban(telegram_id, operator_id): await self._reply_html( - message, - f"✅ 已解禁用户 {telegram_id} 的Emby账号" + message, f"✅ 已解禁用户 {telegram_id} 的Emby账号" ) else: await self._reply_html(message, "❌ 解禁失败,请稍后重试。") @@ -278,22 +319,32 @@ async def select_line(self, message: Message): """ try: telegram_id = message.from_user.id - router_list = config.router_list or await self.user_service.get_router_list(telegram_id) + router_list = config.router_list or await self.user_service.get_router_list( + telegram_id + ) # 缓存到 config 中,减少重复获取 if router_list and not config.router_list: config.router_list = router_list user_router = await self.user_service.get_user_router(telegram_id) - user_router_index = user_router.get('index', '') + user_router_index = user_router.get("index", "") message_text = f"当前线路:{user_router_index}\n请选择线路:" message_buttons = [] for router in router_list: - index = router.get('index') - name = router.get('name') + index = router.get("index") + name = router.get("name") # 已选线路高亮 - button_text = f"🔵 {name}" if index == user_router_index else f"⚪ {name}" - message_buttons.append([InlineKeyboardButton(button_text, callback_data=f"SELECTROUTE_{index}")]) + button_text = ( + f"🔵 {name}" if index == user_router_index else f"⚪ {name}" + ) + message_buttons.append( + [ + InlineKeyboardButton( + button_text, callback_data=f"SELECTROUTE_{index}" + ) + ] + ) keyboard = InlineKeyboardMarkup(message_buttons) await self._reply_html(message, message_text, reply_markup=keyboard) @@ -307,8 +358,14 @@ async def group_member_change_handler(self, clent, message: Message): if message.left_chat_member: left_member_id = message.left_chat_member.id left_member = await self.user_service.must_get_user(left_member_id) - if left_member.has_emby_account() and not left_member.is_emby_baned() and not left_member.is_whitelist: - await self.user_service.emby_ban(message.left_chat_member.id, "用户已退出群组") + if ( + left_member.has_emby_account() + and not left_member.is_emby_baned() + and not left_member.is_whitelist + ): + await self.user_service.emby_ban( + message.left_chat_member.id, "用户已退出群组" + ) config.group_members.pop(message.left_chat_member.id, None) if message.new_chat_members: for new_member in message.new_chat_members: @@ -318,20 +375,24 @@ async def handle_callback_query(self, client, callback_query: CallbackQuery): """ 回调按钮事件统一处理,如切换线路。 """ - data = callback_query.data.split('_') - if data[0] == 'SELECTROUTE': + data = callback_query.data.split("_") + if data[0] == "SELECTROUTE": index = data[1] try: if not config.router_list: await callback_query.answer("尚未加载线路列表,请稍后重试") return - selected_router = next((r for r in config.router_list if r['index'] == index), None) + selected_router = next( + (r for r in config.router_list if r["index"] == index), None + ) if not selected_router: await callback_query.answer("线路不存在") return - await self.user_service.update_user_router(callback_query.from_user.id, index) + await self.user_service.update_user_router( + callback_query.from_user.id, index + ) await callback_query.answer("线路已更新") await callback_query.message.edit( f"已选择 {selected_router['name']}\n" @@ -357,19 +418,17 @@ async def count(self, message: Message): f"🎬 电影数量:{count_data.get('MovieCount', 0)}\n" f"📽️ 剧集数量:{count_data.get('SeriesCount', 0)}\n" f"🎞️ 总集数:{count_data.get('EpisodeCount', 0)}\n" - ) + ), ) except Exception as e: await self._send_error(message, e, prefix="查询失败") - async def register_until(self, message: Message): + @ensure_args(2, "/register_until 2023-10-01 12:00:00") + async def register_until(self, message: Message, args: list[str]): """ /register_until <时间: YYYY-MM-DD HH:MM:SS> 限时开放注册 """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 2, "/register_until 2023-10-01 12:00:00"): - return time_str = " ".join(args) try: @@ -378,24 +437,30 @@ async def register_until(self, message: Message): if time < now: return await self._reply_html(message, "❌ 时间必须晚于当前时间") - await self.user_service.set_emby_config(message.from_user.id, register_public_time=int(time.timestamp())) - await self._reply_html(message, f"✅ 已开放注册,截止时间:{time_str}") + await self.user_service.set_emby_config( + message.from_user.id, register_public_time=int(time.timestamp()) + ) + await self._reply_html( + message, f"✅ 已开放注册,截止时间:{time_str}" + ) except Exception as e: await self._send_error(message, e, prefix="开放注册失败") - async def register_amount(self, message: Message): + @ensure_args(1, "/register_amount <人数>") + async def register_amount(self, message: Message, args: list[str]): """ /register_amount <人数> 开放指定数量的注册名额 """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/register_amount <人数>"): - return try: amount = int(args[0]) - await self.user_service.set_emby_config(message.from_user.id, register_public_user=amount) - await self._reply_html(message, f"✅ 已开放注册,名额:{amount}") + await self.user_service.set_emby_config( + message.from_user.id, register_public_user=amount + ) + await self._reply_html( + message, f"✅ 已开放注册,名额:{amount}" + ) except Exception as e: await self._send_error(message, e, prefix="开放注册失败") @@ -429,59 +494,87 @@ async def help_command(self, message: Message): # =============== 命令挂载 =============== def setup_commands(self): - @self.bot_client.client.on_message(filters.private & filters.command(["help", "start"])) + @self.bot_client.client.on_message( + filters.private & filters.command(["help", "start"]) + ) async def c_help(client, message): await self.help_command(message) - @self.bot_client.client.on_message(filters.command("count") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.command("count") & user_in_group_on_filter + ) async def c_count(client, message): await self.count(message) - @self.bot_client.client.on_message(filters.command("info") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.command("info") & user_in_group_on_filter + ) async def c_info(client, message): await self.info(message) - @self.bot_client.client.on_message(filters.private & filters.command("use_code") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.private & filters.command("use_code") & user_in_group_on_filter + ) async def c_use_code(client, message): await self.use_code(message) - @self.bot_client.client.on_message(filters.private & filters.command("create") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.private & filters.command("create") & user_in_group_on_filter + ) async def c_create_user(client, message): await self.create_user(message) @self.bot_client.client.on_message( - filters.private & filters.command("reset_emby_password") & user_in_group_on_filter & emby_user_on_filter + filters.private + & filters.command("reset_emby_password") + & user_in_group_on_filter + & emby_user_on_filter ) async def c_reset_emby_password(client, message): await self.reset_emby_password(message) @self.bot_client.client.on_message( - filters.private & filters.command("select_line") & user_in_group_on_filter & emby_user_on_filter + filters.private + & filters.command("select_line") + & user_in_group_on_filter + & emby_user_on_filter ) async def c_select_line(client, message): await self.select_line(message) - @self.bot_client.client.on_message(filters.command("new_code") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("new_code") & admin_user_on_filter + ) async def c_new_code(client, message): await self.new_code(message) - @self.bot_client.client.on_message(filters.command("new_whitelist_code") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("new_whitelist_code") & admin_user_on_filter + ) async def c_new_whitelist_code(client, message): await self.new_whitelist_code(message) - @self.bot_client.client.on_message(filters.command("ban_emby") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("ban_emby") & admin_user_on_filter + ) async def c_ban_emby(client, message): await self.ban_emby(message) - @self.bot_client.client.on_message(filters.command("unban_emby") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("unban_emby") & admin_user_on_filter + ) async def c_unban_emby(client, message): await self.unban_emby(message) - @self.bot_client.client.on_message(filters.command("register_until") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("register_until") & admin_user_on_filter + ) async def c_register_until(client, message): await self.register_until(message) - @self.bot_client.client.on_message(filters.command("register_amount") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("register_amount") & admin_user_on_filter + ) async def c_register_amount(client, message): await self.register_amount(message) @@ -489,6 +582,8 @@ async def c_register_amount(client, message): async def c_select_line_cb(client, callback_query): await self.handle_callback_query(client, callback_query) - @self.bot_client.client.on_message(filters.left_chat_member | filters.new_chat_members) + @self.bot_client.client.on_message( + filters.left_chat_member | filters.new_chat_members + ) async def group_member_change_handler(client, message): - await self.group_member_change_handler(client, message) \ No newline at end of file + await self.group_member_change_handler(client, message) From 9fb62fe135c025135e81643952632b0821e54f7d Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:45:20 +0800 Subject: [PATCH 2/3] fix: fix AttributeError when accessing group member username - Resolved AttributeError caused by attempting to call 'get' on a 'User' object - Updated code to properly check if group member data exists before accessing 'username' --- services/user_service.py | 78 +++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/services/user_service.py b/services/user_service.py index 40e073f..c460761 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -33,7 +33,9 @@ async def get_or_create_user_by_telegram_id(telegram_id: int) -> User: default_user = User( telegram_id=telegram_id, is_admin=telegram_id in config.admin_list, - telegram_name=config.group_members.get(telegram_id, {}).get('username'), + telegram_name=config.group_members.get(telegram_id, {}).username + if config.group_members.get(telegram_id) + else None, ) user_id = await UserOrm().add(default_user) user = default_user @@ -62,7 +64,9 @@ async def must_get_emby_user(self, telegram_id: int) -> User: raise Exception("该用户的 Emby 账号已被禁用,无法执行此操作。") return user - async def _emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + async def _emby_create_user( + self, telegram_id: int, username: str, password: str + ) -> User: """内部使用:真正调用 Emby API 创建用户,并设置初始密码""" user = await self.get_or_create_user_by_telegram_id(telegram_id) emby_user = self.emby_api.create_user(username) @@ -82,38 +86,46 @@ async def _emby_create_user(self, telegram_id: int, username: str, password: str @staticmethod def gen_default_passwd() -> str: """生成默认密码:随机6位的字母数字组合""" - return ''.join(sample(string.ascii_letters + string.digits, 6)) + return "".join(sample(string.ascii_letters + string.digits, 6)) @staticmethod def gen_register_code(num: int) -> List[str]: """批量生成普通邀请码""" - return [f'epr-{str(shortuuid.uuid())}' for _ in range(num)] + return [f"epr-{str(shortuuid.uuid())}" for _ in range(num)] @staticmethod def gen_whitelist_code(num: int) -> List[str]: """批量生成白名单邀请码""" - return [f'epw-{str(shortuuid.uuid())}' for _ in range(num)] + return [f"epw-{str(shortuuid.uuid())}" for _ in range(num)] - async def create_invite_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + async def create_invite_code( + self, telegram_id: int, count: int = 1 + ) -> List[InviteCode]: """创建普通邀请码,需检测用户是否有权限""" user = await self.must_get_user(telegram_id) if not user.check_create_invite_code(): raise Exception("您没有权限生成普通邀请码。") code_objs = [ - InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER) + InviteCode( + code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER + ) for code in self.gen_register_code(count) ] return await InviteCodeOrm().bulk_add(code_objs) - async def create_whitelist_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + async def create_whitelist_code( + self, telegram_id: int, count: int = 1 + ) -> List[InviteCode]: """创建白名单邀请码,需检测用户是否有权限""" user = await self.must_get_user(telegram_id) if not user.check_create_whitelist_code(): raise Exception("您没有权限生成白名单邀请码。") code_objs = [ - InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST) + InviteCode( + code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST + ) for code in self.gen_whitelist_code(count) ] return await InviteCodeOrm().bulk_add(code_objs) @@ -125,7 +137,9 @@ async def emby_info(self, telegram_id: int) -> Tuple[User, Dict]: raise Exception("该用户尚未绑定 Emby 账号。") emby_user = self.emby_api.get_user(str(user.emby_id)) if not emby_user: - raise Exception("从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。") + raise Exception( + "从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。" + ) return user, emby_user async def first_or_create_emby_config(self) -> Config: @@ -133,14 +147,14 @@ async def first_or_create_emby_config(self) -> Config: emby_config = await ConfigOrm().query_one(conds=[Config.id == 1]) if not emby_config: emby_config = Config( - register_public_user=0, - register_public_time=0, - total_register_user=0 + register_public_user=0, register_public_time=0, total_register_user=0 ) await ConfigOrm().add(emby_config) return emby_config - async def emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + async def emby_create_user( + self, telegram_id: int, username: str, password: str + ) -> User: """创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user""" user = await self.get_or_create_user_by_telegram_id(telegram_id) if user.has_emby_account(): @@ -178,14 +192,13 @@ async def _check_register_permission(self, user: User, emby_config: Config) -> b enable_register = True if 0 < emby_config.register_public_time < datetime.now().timestamp(): await ConfigOrm().update( - values={'register_public_time': 0}, - conds=[Config.id == 1] + values={"register_public_time": 0}, conds=[Config.id == 1] ) return enable_register async def redeem_code(self, telegram_id: int, code: str): """使用邀请码,分为普通注册邀请码和白名单邀请码""" - pattern = re.compile(r'^(epr|epw)-[A-Za-z0-9]+$') + pattern = re.compile(r"^(epr|epw)-[A-Za-z0-9]+$") if not pattern.match(code): raise Exception("邀请码格式不正确。") @@ -226,7 +239,7 @@ async def redeem_code(self, telegram_id: int, code: str): return valid_code - async def reset_password(self, telegram_id: int, password: str = '') -> bool: + async def reset_password(self, telegram_id: int, password: str = "") -> bool: """重置用户的 Emby 密码。""" user = await self.must_get_emby_user(telegram_id) try: @@ -237,7 +250,9 @@ async def reset_password(self, telegram_id: int, password: str = '') -> bool: logger.error(f"重置密码失败: {e}") return False - async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None) -> bool: + async def emby_ban( + self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None + ) -> bool: """禁用用户""" if operator_telegram_id is not None: admin_user = await self.must_get_user(operator_telegram_id) @@ -253,14 +268,16 @@ async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Op user.reason = reason await UserOrm().update( {"ban_time": user.ban_time, "reason": reason}, - conds=[User.id == user.id] + conds=[User.id == user.id], ) return True except Exception as e: logger.error(f"禁用用户失败: {e}") return False - async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] = None) -> bool: + async def emby_unban( + self, telegram_id: int, operator_telegram_id: Optional[int] = None + ) -> bool: """解禁用户""" if operator_telegram_id is not None: admin_user = await self.must_get_user(operator_telegram_id) @@ -275,16 +292,19 @@ async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] user.ban_time = 0 user.reason = "" await UserOrm().update( - {"ban_time": 0, "reason": None}, - conds=[User.id == user.id] + {"ban_time": 0, "reason": None}, conds=[User.id == user.id] ) return True except Exception as e: logger.error(f"解禁用户失败: {e}") return False - async def set_emby_config(self, telegram_id: int, register_public_user: Optional[int] = None, - register_public_time: Optional[int] = None) -> Config: + async def set_emby_config( + self, + telegram_id: int, + register_public_user: Optional[int] = None, + register_public_time: Optional[int] = None, + ) -> Config: """设置 Emby 注册相关配置,如公共注册名额和公共注册截止时间""" user = await self.must_get_user(telegram_id) user.check_set_emby_config() @@ -300,10 +320,10 @@ async def set_emby_config(self, telegram_id: int, register_public_user: Optional await ConfigOrm().update( values={ - 'register_public_user': emby_config.register_public_user, - 'register_public_time': emby_config.register_public_time + "register_public_user": emby_config.register_public_user, + "register_public_time": emby_config.register_public_time, }, - conds=[Config.id == 1] + conds=[Config.id == 1], ) return emby_config @@ -324,4 +344,4 @@ async def update_user_router(self, telegram_id: int, new_index: str) -> bool: async def get_router_list(self, telegram_id: int) -> List[Dict]: """获取所有可用线路""" await self.must_get_emby_user(telegram_id) - return self.emby_router_api.query_all_route() \ No newline at end of file + return self.emby_router_api.query_all_route() From d98a7338ed587d475e26282be2c16cf4e0bfb07b Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:48:41 +0800 Subject: [PATCH 3/3] fix(logger): update logger configuration for better IDE support - Changed logger to output to both terminal and log file - Added StreamHandler to output logs to terminal for easier debugging in IDEs - Retained FileHandler to write logs to default.log for persistence - Set logging level based on config.log_level --- app.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index e4774b3..9d9d04b 100644 --- a/app.py +++ b/app.py @@ -15,6 +15,9 @@ # Initialize logger logger = logging.getLogger(__name__) +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +logger.addHandler(console_handler) async def create_database_if_not_exists() -> None: @@ -51,22 +54,36 @@ async def _init_db() -> None: def _init_logger() -> None: """初始化日志记录器。""" - logging.basicConfig( - format="%(levelname)s [%(asctime)s] %(name)s - %(message)s", + handler = logging.StreamHandler() # 输出到终端 + formatter = logging.Formatter( + fmt="%(levelname)s [%(asctime)s] %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - level=config.log_level, - filename="default.log", ) + handler.setFormatter(formatter) + + # 添加流处理器 + logger.addHandler(handler) + + # 设置日志级别 + logger.setLevel(config.log_level) + + # 如果你需要将日志写入文件,可以继续保持原有的文件配置: + file_handler = logging.FileHandler("default.log") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + def _init_tz() -> None: """初始化时区设置。""" if config.timezone: try: timezone = pytz.timezone(config.timezone) - now = datetime.now(timezone).strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now(timezone).strftime("%Y-%m-%d %H:%M:%S") logger.info(f"时区已设置为: {config.timezone},当前时间: {now}") except pytz.UnknownTimeZoneError: - logger.error(f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。") + logger.error( + f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。" + ) async def setup_bot() -> BotClient: @@ -94,7 +111,7 @@ async def main() -> None: _init_logger() _init_tz() - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"程序启动时间: {now}") await _init_db()