From 00ce1b91fc3307459340fe580e2df06b64735d43 Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:38:48 +0800 Subject: [PATCH 1/6] Refactor(commands): refactor argument validation to decorator with formatting --- bot/commands.py | 265 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 180 insertions(+), 85 deletions(-) diff --git a/bot/commands.py b/bot/commands.py index 2a3d801..c861633 100644 --- a/bot/commands.py +++ b/bot/commands.py @@ -1,12 +1,22 @@ import logging +import functools from datetime import datetime from pyrogram import filters from pyrogram.enums import ParseMode -from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery, Message +from pyrogram.types import ( + InlineKeyboardButton, + InlineKeyboardMarkup, + CallbackQuery, + Message, +) from bot.bot_client import BotClient -from bot.filters import user_in_group_on_filter, admin_user_on_filter, emby_user_on_filter +from bot.filters import ( + user_in_group_on_filter, + admin_user_on_filter, + emby_user_on_filter, +) from bot.message_helper import get_user_telegram_id from bot.utils import parse_iso8601_to_normal_date from config import config @@ -41,16 +51,32 @@ def _parse_args(message: Message) -> list[str]: parts = message.text.strip().split(" ") return parts[1:] if len(parts) > 1 else [] - async def _ensure_args(self, message: Message, args: list, min_len: int, usage: str): + @staticmethod + def ensure_args(min_len: int, usage: str): """ - 确保命令行参数长度足够,不足则回复用法说明。 + 装饰器:确保命令行参数长度足够,不足则回复用法说明。 """ - if len(args) < min_len: - await self._reply_html(message, f"参数不足,请参考用法:\n{usage}") - return False - return True - async def _send_error(self, message: Message, error: Exception, prefix: str = "操作失败"): + def decorator(func): + @functools.wraps(func) + async def wrapper(self, message, *args, **kwargs): + # 从消息中解析参数 + parsed_args = self._parse_args(message) + if len(parsed_args) < min_len: + await self._reply_html( + message, f"参数不足,请参考用法:\n{usage}" + ) + return + # 将解析好的参数传递给目标函数,避免在函数内部再调用 _parse_args + return await func(self, message, parsed_args, *args, **kwargs) + + return wrapper + + return decorator + + async def _send_error( + self, message: Message, error: Exception, prefix: str = "操作失败" + ): """ 统一的异常捕获后回复方式。 """ @@ -59,22 +85,22 @@ async def _send_error(self, message: Message, error: Exception, prefix: str = " # =============== 各类命令逻辑 =============== - async def create_user(self, message: Message): + @ensure_args(1, "/create <用户名>") + async def create_user(self, message: Message, args: list[str]): """ /create <用户名> """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/create <用户名>"): - return emby_name = args[0] try: default_password = self.user_service.gen_default_passwd() - user = await self.user_service.emby_create_user(message.from_user.id, emby_name, default_password) + user = await self.user_service.emby_create_user( + message.from_user.id, emby_name, default_password + ) if user and user.has_emby_account(): await self._reply_html( message, - f"✅ 创建用户成功。\n初始密码:{default_password}" + f"✅ 创建用户成功。\n初始密码:{default_password}", ) else: await self._reply_html(message, "❌ 创建用户失败,请稍后重试。") @@ -89,10 +115,17 @@ async def info(self, message: Message): telegram_id = await get_user_telegram_id(self.bot_client.client, message) try: user, emby_info = await self.user_service.emby_info(telegram_id) - last_active = (parse_iso8601_to_normal_date(emby_info.get("LastActivityDate")) - if emby_info.get("LastActivityDate") else "无") - date_created = parse_iso8601_to_normal_date(emby_info.get("DateCreated", "")) - ban_status = "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用" + last_active = ( + parse_iso8601_to_normal_date(emby_info.get("LastActivityDate")) + if emby_info.get("LastActivityDate") + else "无" + ) + date_created = parse_iso8601_to_normal_date( + emby_info.get("DateCreated", "") + ) + ban_status = ( + "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用" + ) reply_text = ( f"👤 用户信息:\n" @@ -105,7 +138,9 @@ async def info(self, message: Message): ) if user.ban_time and user.ban_time > 0: - ban_time = datetime.fromtimestamp(user.ban_time).strftime('%Y-%m-%d %H:%M:%S') + ban_time = datetime.fromtimestamp(user.ban_time).strftime( + "%Y-%m-%d %H:%M:%S" + ) reply_text += f"• 被ban时间:{ban_time}\n" if user.reason: reply_text += f"• 被ban原因:{user.reason}\n" @@ -114,13 +149,11 @@ async def info(self, message: Message): except Exception as e: await self._send_error(message, e, prefix="查询失败") - async def use_code(self, message: Message): + @ensure_args(1, "/use_code <邀请码>") + async def use_code(self, message: Message, args: list[str]): """ /use_code <邀请码> """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/use_code <邀请码>"): - return code = args[0] telegram_id = message.from_user.id @@ -130,14 +163,18 @@ async def use_code(self, message: Message): return await self._reply_html(message, "❌ 邀请码使用失败") # 根据类型给出不同的回复 if used_code.code_type == InviteCodeType.REGISTER: - await self._reply_html(message, "✅ 邀请码使用成功,您已获得创建账号资格") + await self._reply_html( + message, "✅ 邀请码使用成功,您已获得创建账号资格" + ) else: await self._reply_html(message, "✅ 邀请码使用成功,您已获得白名单资格") # 如果该邀请码在bot中记录了消息,需要删除 if self.code_to_message_id.get(code): code_to_message_id = self.code_to_message_id[code] - await self.bot_client.client.delete_messages(code_to_message_id[0], code_to_message_id[1]) + await self.bot_client.client.delete_messages( + code_to_message_id[0], code_to_message_id[1] + ) del self.code_to_message_id[code] except Exception as e: await self._send_error(message, e, prefix="邀请码使用失败") @@ -148,10 +185,12 @@ async def reset_emby_password(self, message: Message): """ default_password = self.user_service.gen_default_passwd() try: - if await self.user_service.reset_password(message.from_user.id, default_password): + if await self.user_service.reset_password( + message.from_user.id, default_password + ): await self._reply_html( message, - f"✅ 密码重置成功。\n新密码:{default_password}" + f"✅ 密码重置成功。\n新密码:{default_password}", ) else: await self._reply_html(message, "❌ 密码重置失败,请稍后重试。") @@ -168,11 +207,15 @@ async def new_code(self, message: Message): try: num = int(args[0]) except ValueError: - return await self._reply_html(message, "❌ 请输入有效数量 /new_code [整数]") + return await self._reply_html( + message, "❌ 请输入有效数量 /new_code [整数]" + ) num = min(num, 20) try: - code_list = await self.user_service.create_invite_code(message.from_user.id, num) + code_list = await self.user_service.create_invite_code( + message.from_user.id, num + ) for code_obj in code_list: message_text = f"📌 邀请码:\n点击复制👉{code_obj.code}" if message.reply_to_message is not None: @@ -188,10 +231,7 @@ async def new_code(self, message: Message): ) await self._reply_html(message, "✅ 已发送邀请码") else: - msg = await self._reply_html( - message, - message_text - ) + msg = await self._reply_html(message, message_text) self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) except Exception as e: await self._send_error(message, e, prefix="创建邀请码失败") @@ -206,13 +246,19 @@ async def new_whitelist_code(self, message: Message): try: num = int(args[0]) except ValueError: - return await self._reply_html(message, "❌ 请输入有效数量 /new_whitelist_code [整数]") + return await self._reply_html( + message, "❌ 请输入有效数量 /new_whitelist_code [整数]" + ) num = min(num, 20) try: - code_list = await self.user_service.create_whitelist_code(message.from_user.id, num) + code_list = await self.user_service.create_whitelist_code( + message.from_user.id, num + ) for code_obj in code_list: - message_text = f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" + message_text = ( + f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" + ) if message.reply_to_message is not None: await self.bot_client.client.send_message( chat_id=message.from_user.id, @@ -226,10 +272,7 @@ async def new_whitelist_code(self, message: Message): ) await self._reply_html(message, "✅ 已发送邀请码") else: - msg = await self._reply_html( - message, - message_text - ) + msg = await self._reply_html(message, message_text) self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) except Exception as e: await self._send_error(message, e, prefix="创建白名单邀请码失败") @@ -246,8 +289,7 @@ async def ban_emby(self, message: Message): try: if await self.user_service.emby_ban(telegram_id, reason, operator_id): await self._reply_html( - message, - f"✅ 已禁用用户 {telegram_id} 的Emby账号" + message, f"✅ 已禁用用户 {telegram_id} 的Emby账号" ) else: await self._reply_html(message, "❌ 禁用失败,请稍后重试。") @@ -263,8 +305,7 @@ async def unban_emby(self, message: Message): try: if await self.user_service.emby_unban(telegram_id, operator_id): await self._reply_html( - message, - f"✅ 已解禁用户 {telegram_id} 的Emby账号" + message, f"✅ 已解禁用户 {telegram_id} 的Emby账号" ) else: await self._reply_html(message, "❌ 解禁失败,请稍后重试。") @@ -278,22 +319,32 @@ async def select_line(self, message: Message): """ try: telegram_id = message.from_user.id - router_list = config.router_list or await self.user_service.get_router_list(telegram_id) + router_list = config.router_list or await self.user_service.get_router_list( + telegram_id + ) # 缓存到 config 中,减少重复获取 if router_list and not config.router_list: config.router_list = router_list user_router = await self.user_service.get_user_router(telegram_id) - user_router_index = user_router.get('index', '') + user_router_index = user_router.get("index", "") message_text = f"当前线路:{user_router_index}\n请选择线路:" message_buttons = [] for router in router_list: - index = router.get('index') - name = router.get('name') + index = router.get("index") + name = router.get("name") # 已选线路高亮 - button_text = f"🔵 {name}" if index == user_router_index else f"⚪ {name}" - message_buttons.append([InlineKeyboardButton(button_text, callback_data=f"SELECTROUTE_{index}")]) + button_text = ( + f"🔵 {name}" if index == user_router_index else f"⚪ {name}" + ) + message_buttons.append( + [ + InlineKeyboardButton( + button_text, callback_data=f"SELECTROUTE_{index}" + ) + ] + ) keyboard = InlineKeyboardMarkup(message_buttons) await self._reply_html(message, message_text, reply_markup=keyboard) @@ -307,8 +358,14 @@ async def group_member_change_handler(self, clent, message: Message): if message.left_chat_member: left_member_id = message.left_chat_member.id left_member = await self.user_service.must_get_user(left_member_id) - if left_member.has_emby_account() and not left_member.is_emby_baned() and not left_member.is_whitelist: - await self.user_service.emby_ban(message.left_chat_member.id, "用户已退出群组") + if ( + left_member.has_emby_account() + and not left_member.is_emby_baned() + and not left_member.is_whitelist + ): + await self.user_service.emby_ban( + message.left_chat_member.id, "用户已退出群组" + ) config.group_members.pop(message.left_chat_member.id, None) if message.new_chat_members: for new_member in message.new_chat_members: @@ -318,20 +375,24 @@ async def handle_callback_query(self, client, callback_query: CallbackQuery): """ 回调按钮事件统一处理,如切换线路。 """ - data = callback_query.data.split('_') - if data[0] == 'SELECTROUTE': + data = callback_query.data.split("_") + if data[0] == "SELECTROUTE": index = data[1] try: if not config.router_list: await callback_query.answer("尚未加载线路列表,请稍后重试") return - selected_router = next((r for r in config.router_list if r['index'] == index), None) + selected_router = next( + (r for r in config.router_list if r["index"] == index), None + ) if not selected_router: await callback_query.answer("线路不存在") return - await self.user_service.update_user_router(callback_query.from_user.id, index) + await self.user_service.update_user_router( + callback_query.from_user.id, index + ) await callback_query.answer("线路已更新") await callback_query.message.edit( f"已选择 {selected_router['name']}\n" @@ -357,19 +418,17 @@ async def count(self, message: Message): f"🎬 电影数量:{count_data.get('MovieCount', 0)}\n" f"📽️ 剧集数量:{count_data.get('SeriesCount', 0)}\n" f"🎞️ 总集数:{count_data.get('EpisodeCount', 0)}\n" - ) + ), ) except Exception as e: await self._send_error(message, e, prefix="查询失败") - async def register_until(self, message: Message): + @ensure_args(2, "/register_until 2023-10-01 12:00:00") + async def register_until(self, message: Message, args: list[str]): """ /register_until <时间: YYYY-MM-DD HH:MM:SS> 限时开放注册 """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 2, "/register_until 2023-10-01 12:00:00"): - return time_str = " ".join(args) try: @@ -378,24 +437,30 @@ async def register_until(self, message: Message): if time < now: return await self._reply_html(message, "❌ 时间必须晚于当前时间") - await self.user_service.set_emby_config(message.from_user.id, register_public_time=int(time.timestamp())) - await self._reply_html(message, f"✅ 已开放注册,截止时间:{time_str}") + await self.user_service.set_emby_config( + message.from_user.id, register_public_time=int(time.timestamp()) + ) + await self._reply_html( + message, f"✅ 已开放注册,截止时间:{time_str}" + ) except Exception as e: await self._send_error(message, e, prefix="开放注册失败") - async def register_amount(self, message: Message): + @ensure_args(1, "/register_amount <人数>") + async def register_amount(self, message: Message, args: list[str]): """ /register_amount <人数> 开放指定数量的注册名额 """ - args = self._parse_args(message) - if not await self._ensure_args(message, args, 1, "/register_amount <人数>"): - return try: amount = int(args[0]) - await self.user_service.set_emby_config(message.from_user.id, register_public_user=amount) - await self._reply_html(message, f"✅ 已开放注册,名额:{amount}") + await self.user_service.set_emby_config( + message.from_user.id, register_public_user=amount + ) + await self._reply_html( + message, f"✅ 已开放注册,名额:{amount}" + ) except Exception as e: await self._send_error(message, e, prefix="开放注册失败") @@ -429,59 +494,87 @@ async def help_command(self, message: Message): # =============== 命令挂载 =============== def setup_commands(self): - @self.bot_client.client.on_message(filters.private & filters.command(["help", "start"])) + @self.bot_client.client.on_message( + filters.private & filters.command(["help", "start"]) + ) async def c_help(client, message): await self.help_command(message) - @self.bot_client.client.on_message(filters.command("count") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.command("count") & user_in_group_on_filter + ) async def c_count(client, message): await self.count(message) - @self.bot_client.client.on_message(filters.command("info") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.command("info") & user_in_group_on_filter + ) async def c_info(client, message): await self.info(message) - @self.bot_client.client.on_message(filters.private & filters.command("use_code") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.private & filters.command("use_code") & user_in_group_on_filter + ) async def c_use_code(client, message): await self.use_code(message) - @self.bot_client.client.on_message(filters.private & filters.command("create") & user_in_group_on_filter) + @self.bot_client.client.on_message( + filters.private & filters.command("create") & user_in_group_on_filter + ) async def c_create_user(client, message): await self.create_user(message) @self.bot_client.client.on_message( - filters.private & filters.command("reset_emby_password") & user_in_group_on_filter & emby_user_on_filter + filters.private + & filters.command("reset_emby_password") + & user_in_group_on_filter + & emby_user_on_filter ) async def c_reset_emby_password(client, message): await self.reset_emby_password(message) @self.bot_client.client.on_message( - filters.private & filters.command("select_line") & user_in_group_on_filter & emby_user_on_filter + filters.private + & filters.command("select_line") + & user_in_group_on_filter + & emby_user_on_filter ) async def c_select_line(client, message): await self.select_line(message) - @self.bot_client.client.on_message(filters.command("new_code") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("new_code") & admin_user_on_filter + ) async def c_new_code(client, message): await self.new_code(message) - @self.bot_client.client.on_message(filters.command("new_whitelist_code") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("new_whitelist_code") & admin_user_on_filter + ) async def c_new_whitelist_code(client, message): await self.new_whitelist_code(message) - @self.bot_client.client.on_message(filters.command("ban_emby") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("ban_emby") & admin_user_on_filter + ) async def c_ban_emby(client, message): await self.ban_emby(message) - @self.bot_client.client.on_message(filters.command("unban_emby") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("unban_emby") & admin_user_on_filter + ) async def c_unban_emby(client, message): await self.unban_emby(message) - @self.bot_client.client.on_message(filters.command("register_until") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("register_until") & admin_user_on_filter + ) async def c_register_until(client, message): await self.register_until(message) - @self.bot_client.client.on_message(filters.command("register_amount") & admin_user_on_filter) + @self.bot_client.client.on_message( + filters.command("register_amount") & admin_user_on_filter + ) async def c_register_amount(client, message): await self.register_amount(message) @@ -489,6 +582,8 @@ async def c_register_amount(client, message): async def c_select_line_cb(client, callback_query): await self.handle_callback_query(client, callback_query) - @self.bot_client.client.on_message(filters.left_chat_member | filters.new_chat_members) + @self.bot_client.client.on_message( + filters.left_chat_member | filters.new_chat_members + ) async def group_member_change_handler(client, message): - await self.group_member_change_handler(client, message) \ No newline at end of file + await self.group_member_change_handler(client, message) From 9fb62fe135c025135e81643952632b0821e54f7d Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:45:20 +0800 Subject: [PATCH 2/6] fix: fix AttributeError when accessing group member username - Resolved AttributeError caused by attempting to call 'get' on a 'User' object - Updated code to properly check if group member data exists before accessing 'username' --- services/user_service.py | 78 +++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/services/user_service.py b/services/user_service.py index 40e073f..c460761 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -33,7 +33,9 @@ async def get_or_create_user_by_telegram_id(telegram_id: int) -> User: default_user = User( telegram_id=telegram_id, is_admin=telegram_id in config.admin_list, - telegram_name=config.group_members.get(telegram_id, {}).get('username'), + telegram_name=config.group_members.get(telegram_id, {}).username + if config.group_members.get(telegram_id) + else None, ) user_id = await UserOrm().add(default_user) user = default_user @@ -62,7 +64,9 @@ async def must_get_emby_user(self, telegram_id: int) -> User: raise Exception("该用户的 Emby 账号已被禁用,无法执行此操作。") return user - async def _emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + async def _emby_create_user( + self, telegram_id: int, username: str, password: str + ) -> User: """内部使用:真正调用 Emby API 创建用户,并设置初始密码""" user = await self.get_or_create_user_by_telegram_id(telegram_id) emby_user = self.emby_api.create_user(username) @@ -82,38 +86,46 @@ async def _emby_create_user(self, telegram_id: int, username: str, password: str @staticmethod def gen_default_passwd() -> str: """生成默认密码:随机6位的字母数字组合""" - return ''.join(sample(string.ascii_letters + string.digits, 6)) + return "".join(sample(string.ascii_letters + string.digits, 6)) @staticmethod def gen_register_code(num: int) -> List[str]: """批量生成普通邀请码""" - return [f'epr-{str(shortuuid.uuid())}' for _ in range(num)] + return [f"epr-{str(shortuuid.uuid())}" for _ in range(num)] @staticmethod def gen_whitelist_code(num: int) -> List[str]: """批量生成白名单邀请码""" - return [f'epw-{str(shortuuid.uuid())}' for _ in range(num)] + return [f"epw-{str(shortuuid.uuid())}" for _ in range(num)] - async def create_invite_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + async def create_invite_code( + self, telegram_id: int, count: int = 1 + ) -> List[InviteCode]: """创建普通邀请码,需检测用户是否有权限""" user = await self.must_get_user(telegram_id) if not user.check_create_invite_code(): raise Exception("您没有权限生成普通邀请码。") code_objs = [ - InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER) + InviteCode( + code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER + ) for code in self.gen_register_code(count) ] return await InviteCodeOrm().bulk_add(code_objs) - async def create_whitelist_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + async def create_whitelist_code( + self, telegram_id: int, count: int = 1 + ) -> List[InviteCode]: """创建白名单邀请码,需检测用户是否有权限""" user = await self.must_get_user(telegram_id) if not user.check_create_whitelist_code(): raise Exception("您没有权限生成白名单邀请码。") code_objs = [ - InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST) + InviteCode( + code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST + ) for code in self.gen_whitelist_code(count) ] return await InviteCodeOrm().bulk_add(code_objs) @@ -125,7 +137,9 @@ async def emby_info(self, telegram_id: int) -> Tuple[User, Dict]: raise Exception("该用户尚未绑定 Emby 账号。") emby_user = self.emby_api.get_user(str(user.emby_id)) if not emby_user: - raise Exception("从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。") + raise Exception( + "从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。" + ) return user, emby_user async def first_or_create_emby_config(self) -> Config: @@ -133,14 +147,14 @@ async def first_or_create_emby_config(self) -> Config: emby_config = await ConfigOrm().query_one(conds=[Config.id == 1]) if not emby_config: emby_config = Config( - register_public_user=0, - register_public_time=0, - total_register_user=0 + register_public_user=0, register_public_time=0, total_register_user=0 ) await ConfigOrm().add(emby_config) return emby_config - async def emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + async def emby_create_user( + self, telegram_id: int, username: str, password: str + ) -> User: """创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user""" user = await self.get_or_create_user_by_telegram_id(telegram_id) if user.has_emby_account(): @@ -178,14 +192,13 @@ async def _check_register_permission(self, user: User, emby_config: Config) -> b enable_register = True if 0 < emby_config.register_public_time < datetime.now().timestamp(): await ConfigOrm().update( - values={'register_public_time': 0}, - conds=[Config.id == 1] + values={"register_public_time": 0}, conds=[Config.id == 1] ) return enable_register async def redeem_code(self, telegram_id: int, code: str): """使用邀请码,分为普通注册邀请码和白名单邀请码""" - pattern = re.compile(r'^(epr|epw)-[A-Za-z0-9]+$') + pattern = re.compile(r"^(epr|epw)-[A-Za-z0-9]+$") if not pattern.match(code): raise Exception("邀请码格式不正确。") @@ -226,7 +239,7 @@ async def redeem_code(self, telegram_id: int, code: str): return valid_code - async def reset_password(self, telegram_id: int, password: str = '') -> bool: + async def reset_password(self, telegram_id: int, password: str = "") -> bool: """重置用户的 Emby 密码。""" user = await self.must_get_emby_user(telegram_id) try: @@ -237,7 +250,9 @@ async def reset_password(self, telegram_id: int, password: str = '') -> bool: logger.error(f"重置密码失败: {e}") return False - async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None) -> bool: + async def emby_ban( + self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None + ) -> bool: """禁用用户""" if operator_telegram_id is not None: admin_user = await self.must_get_user(operator_telegram_id) @@ -253,14 +268,16 @@ async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Op user.reason = reason await UserOrm().update( {"ban_time": user.ban_time, "reason": reason}, - conds=[User.id == user.id] + conds=[User.id == user.id], ) return True except Exception as e: logger.error(f"禁用用户失败: {e}") return False - async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] = None) -> bool: + async def emby_unban( + self, telegram_id: int, operator_telegram_id: Optional[int] = None + ) -> bool: """解禁用户""" if operator_telegram_id is not None: admin_user = await self.must_get_user(operator_telegram_id) @@ -275,16 +292,19 @@ async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] user.ban_time = 0 user.reason = "" await UserOrm().update( - {"ban_time": 0, "reason": None}, - conds=[User.id == user.id] + {"ban_time": 0, "reason": None}, conds=[User.id == user.id] ) return True except Exception as e: logger.error(f"解禁用户失败: {e}") return False - async def set_emby_config(self, telegram_id: int, register_public_user: Optional[int] = None, - register_public_time: Optional[int] = None) -> Config: + async def set_emby_config( + self, + telegram_id: int, + register_public_user: Optional[int] = None, + register_public_time: Optional[int] = None, + ) -> Config: """设置 Emby 注册相关配置,如公共注册名额和公共注册截止时间""" user = await self.must_get_user(telegram_id) user.check_set_emby_config() @@ -300,10 +320,10 @@ async def set_emby_config(self, telegram_id: int, register_public_user: Optional await ConfigOrm().update( values={ - 'register_public_user': emby_config.register_public_user, - 'register_public_time': emby_config.register_public_time + "register_public_user": emby_config.register_public_user, + "register_public_time": emby_config.register_public_time, }, - conds=[Config.id == 1] + conds=[Config.id == 1], ) return emby_config @@ -324,4 +344,4 @@ async def update_user_router(self, telegram_id: int, new_index: str) -> bool: async def get_router_list(self, telegram_id: int) -> List[Dict]: """获取所有可用线路""" await self.must_get_emby_user(telegram_id) - return self.emby_router_api.query_all_route() \ No newline at end of file + return self.emby_router_api.query_all_route() From d98a7338ed587d475e26282be2c16cf4e0bfb07b Mon Sep 17 00:00:00 2001 From: quby Date: Wed, 12 Feb 2025 20:48:41 +0800 Subject: [PATCH 3/6] fix(logger): update logger configuration for better IDE support - Changed logger to output to both terminal and log file - Added StreamHandler to output logs to terminal for easier debugging in IDEs - Retained FileHandler to write logs to default.log for persistence - Set logging level based on config.log_level --- app.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index e4774b3..9d9d04b 100644 --- a/app.py +++ b/app.py @@ -15,6 +15,9 @@ # Initialize logger logger = logging.getLogger(__name__) +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) +logger.addHandler(console_handler) async def create_database_if_not_exists() -> None: @@ -51,22 +54,36 @@ async def _init_db() -> None: def _init_logger() -> None: """初始化日志记录器。""" - logging.basicConfig( - format="%(levelname)s [%(asctime)s] %(name)s - %(message)s", + handler = logging.StreamHandler() # 输出到终端 + formatter = logging.Formatter( + fmt="%(levelname)s [%(asctime)s] %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - level=config.log_level, - filename="default.log", ) + handler.setFormatter(formatter) + + # 添加流处理器 + logger.addHandler(handler) + + # 设置日志级别 + logger.setLevel(config.log_level) + + # 如果你需要将日志写入文件,可以继续保持原有的文件配置: + file_handler = logging.FileHandler("default.log") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + def _init_tz() -> None: """初始化时区设置。""" if config.timezone: try: timezone = pytz.timezone(config.timezone) - now = datetime.now(timezone).strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now(timezone).strftime("%Y-%m-%d %H:%M:%S") logger.info(f"时区已设置为: {config.timezone},当前时间: {now}") except pytz.UnknownTimeZoneError: - logger.error(f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。") + logger.error( + f"无效的时区配置: {config.timezone},请检查 config.timezone 设置。" + ) async def setup_bot() -> BotClient: @@ -94,7 +111,7 @@ async def main() -> None: _init_logger() _init_tz() - now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"程序启动时间: {now}") await _init_db() From e3817f9df405d998ef8977fcebd3f971c5247237 Mon Sep 17 00:00:00 2001 From: quby Date: Thu, 13 Feb 2025 22:28:50 +0800 Subject: [PATCH 4/6] style(ruff): adjust ruff configuration to enforce PEP8 compliance - Set line-length to 79 to conform with PEP8 standards - Removed ignore for E501 to enable line-length checks - Ensured formatting aligns with PEP8 requirements --- app.py | 14 +++- bot/__init__.py | 8 ++- bot/bot_client.py | 17 ++--- bot/commands.py | 106 +++++++++++++++++++-------- bot/filters.py | 20 ++++-- bot/message_helper.py | 23 ++++-- bot/utils.py | 15 ++-- config.py | 7 +- core/emby_api.py | 140 +++++++++++++++++++++++++----------- models/__init__.py | 1 - models/config_model.py | 18 +++-- models/invite_code_model.py | 44 ++++++++---- models/user_model.py | 30 +++++--- ruff.toml | 12 ++-- services/user_service.py | 64 +++++++++++++---- 15 files changed, 367 insertions(+), 152 deletions(-) diff --git a/app.py b/app.py index 9d9d04b..61fd1f8 100644 --- a/app.py +++ b/app.py @@ -3,7 +3,11 @@ from datetime import datetime import pytz -from py_tools.connections.db.mysql import DBManager, BaseOrmTable, SQLAlchemyManager +from py_tools.connections.db.mysql import ( + DBManager, + BaseOrmTable, + SQLAlchemyManager, +) from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine @@ -100,7 +104,9 @@ async def setup_bot() -> BotClient: async def fetch_group_members(bot_client: BotClient) -> None: """获取群组成员并更新配置。""" - members_in_group = await bot_client.get_group_members(config.telegram_group_ids) + members_in_group = await bot_client.get_group_members( + config.telegram_group_ids + ) for group_members in members_in_group.values(): for telegram_id in group_members: config.group_members[telegram_id] = group_members[telegram_id] @@ -126,7 +132,9 @@ async def main() -> None: emby_router_api = EmbyRouterAPI(config.api_url, config.api_key) command_handler = CommandHandler( bot_client=bot_client, - user_service=UserService(emby_api=emby_api, emby_router_api=emby_router_api), + user_service=UserService( + emby_api=emby_api, emby_router_api=emby_router_api + ), ) logger.info("Emby API 和命令处理器初始化完成。") diff --git a/bot/__init__.py b/bot/__init__.py index b8652cf..ed247e9 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -2,7 +2,11 @@ from .bot_client import BotClient from .commands import CommandHandler -from .filters import user_in_group_on_filter, admin_user_on_filter, emby_user_on_filter +from .filters import ( + user_in_group_on_filter, + admin_user_on_filter, + emby_user_on_filter, +) from .message_helper import get_user_telegram_id from .utils import parse_iso8601_to_normal_date, parse_timestamp_to_normal_date @@ -18,4 +22,4 @@ "get_user_telegram_id", "parse_iso8601_to_normal_date", "parse_timestamp_to_normal_date", -] \ No newline at end of file +] diff --git a/bot/bot_client.py b/bot/bot_client.py index 7efbee4..971601f 100644 --- a/bot/bot_client.py +++ b/bot/bot_client.py @@ -7,17 +7,14 @@ class BotClient: def __init__( - self, - api_id: str, - api_hash: str, - bot_token: str, - name="emby_bot", + self, + api_id: str, + api_hash: str, + bot_token: str, + name="emby_bot", ): self.client = Client( - name=name, - api_id=api_id, - api_hash=api_hash, - bot_token=bot_token + name=name, api_id=api_id, api_hash=api_hash, bot_token=bot_token ) logger.info(f"Bot client initialized with name: {name}") @@ -41,4 +38,4 @@ async def idle(): def stop(self): logger.info("Stopping bot client") - return self.client.stop() \ No newline at end of file + return self.client.stop() diff --git a/bot/commands.py b/bot/commands.py index c861633..bc6781c 100644 --- a/bot/commands.py +++ b/bot/commands.py @@ -64,7 +64,8 @@ async def wrapper(self, message, *args, **kwargs): parsed_args = self._parse_args(message) if len(parsed_args) < min_len: await self._reply_html( - message, f"参数不足,请参考用法:\n{usage}" + message, + f"参数不足,请参考用法:\n{usage}", ) return # 将解析好的参数传递给目标函数,避免在函数内部再调用 _parse_args @@ -103,7 +104,9 @@ async def create_user(self, message: Message, args: list[str]): f"✅ 创建用户成功。\n初始密码:{default_password}", ) else: - await self._reply_html(message, "❌ 创建用户失败,请稍后重试。") + await self._reply_html( + message, "❌ 创建用户失败,请稍后重试。" + ) except Exception as e: await self._send_error(message, e, prefix="创建用户失败") @@ -112,7 +115,9 @@ async def info(self, message: Message): /info 如果是私聊,查看自己信息;如果群里回复某人,则查看对方信息 """ - telegram_id = await get_user_telegram_id(self.bot_client.client, message) + telegram_id = await get_user_telegram_id( + self.bot_client.client, message + ) try: user, emby_info = await self.user_service.emby_info(telegram_id) last_active = ( @@ -124,7 +129,9 @@ async def info(self, message: Message): emby_info.get("DateCreated", "") ) ban_status = ( - "正常" if (user.ban_time is None or user.ban_time == 0) else "已禁用" + "正常" + if (user.ban_time is None or user.ban_time == 0) + else "已禁用" ) reply_text = ( @@ -167,7 +174,9 @@ async def use_code(self, message: Message, args: list[str]): message, "✅ 邀请码使用成功,您已获得创建账号资格" ) else: - await self._reply_html(message, "✅ 邀请码使用成功,您已获得白名单资格") + await self._reply_html( + message, "✅ 邀请码使用成功,您已获得白名单资格" + ) # 如果该邀请码在bot中记录了消息,需要删除 if self.code_to_message_id.get(code): @@ -193,7 +202,9 @@ async def reset_emby_password(self, message: Message): f"✅ 密码重置成功。\n新密码:{default_password}", ) else: - await self._reply_html(message, "❌ 密码重置失败,请稍后重试。") + await self._reply_html( + message, "❌ 密码重置失败,请稍后重试。" + ) except Exception as e: await self._send_error(message, e, prefix="密码重置失败") @@ -217,7 +228,9 @@ async def new_code(self, message: Message): message.from_user.id, num ) for code_obj in code_list: - message_text = f"📌 邀请码:\n点击复制👉{code_obj.code}" + message_text = ( + f"📌 邀请码:\n点击复制👉{code_obj.code}" + ) if message.reply_to_message is not None: await self.bot_client.client.send_message( chat_id=message.from_user.id, @@ -232,7 +245,10 @@ async def new_code(self, message: Message): await self._reply_html(message, "✅ 已发送邀请码") else: msg = await self._reply_html(message, message_text) - self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) + self.code_to_message_id[code_obj.code] = ( + message.chat.id, + msg.id, + ) except Exception as e: await self._send_error(message, e, prefix="创建邀请码失败") @@ -256,9 +272,7 @@ async def new_whitelist_code(self, message: Message): message.from_user.id, num ) for code_obj in code_list: - message_text = ( - f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" - ) + message_text = f"📌 白名单邀请码:\n点击复制👉{code_obj.code}" if message.reply_to_message is not None: await self.bot_client.client.send_message( chat_id=message.from_user.id, @@ -273,7 +287,10 @@ async def new_whitelist_code(self, message: Message): await self._reply_html(message, "✅ 已发送邀请码") else: msg = await self._reply_html(message, message_text) - self.code_to_message_id[code_obj.code] = (message.chat.id, msg.id) + self.code_to_message_id[code_obj.code] = ( + message.chat.id, + msg.id, + ) except Exception as e: await self._send_error(message, e, prefix="创建白名单邀请码失败") @@ -285,11 +302,16 @@ async def ban_emby(self, message: Message): reason = args[0] if args else "管理员禁用" operator_id = message.from_user.id - telegram_id = await get_user_telegram_id(self.bot_client.client, message) + telegram_id = await get_user_telegram_id( + self.bot_client.client, message + ) try: - if await self.user_service.emby_ban(telegram_id, reason, operator_id): + if await self.user_service.emby_ban( + telegram_id, reason, operator_id + ): await self._reply_html( - message, f"✅ 已禁用用户 {telegram_id} 的Emby账号" + message, + f"✅ 已禁用用户 {telegram_id} 的Emby账号", ) else: await self._reply_html(message, "❌ 禁用失败,请稍后重试。") @@ -301,11 +323,14 @@ async def unban_emby(self, message: Message): /unban_emby (群里需回复某人或手动指定) """ operator_id = message.from_user.id - telegram_id = await get_user_telegram_id(self.bot_client.client, message) + telegram_id = await get_user_telegram_id( + self.bot_client.client, message + ) try: if await self.user_service.emby_unban(telegram_id, operator_id): await self._reply_html( - message, f"✅ 已解禁用户 {telegram_id} 的Emby账号" + message, + f"✅ 已解禁用户 {telegram_id} 的Emby账号", ) else: await self._reply_html(message, "❌ 解禁失败,请稍后重试。") @@ -319,8 +344,9 @@ async def select_line(self, message: Message): """ try: telegram_id = message.from_user.id - router_list = config.router_list or await self.user_service.get_router_list( - telegram_id + router_list = ( + config.router_list + or await self.user_service.get_router_list(telegram_id) ) # 缓存到 config 中,减少重复获取 if router_list and not config.router_list: @@ -328,7 +354,9 @@ async def select_line(self, message: Message): user_router = await self.user_service.get_user_router(telegram_id) user_router_index = user_router.get("index", "") - message_text = f"当前线路:{user_router_index}\n请选择线路:" + message_text = ( + f"当前线路:{user_router_index}\n请选择线路:" + ) message_buttons = [] for router in router_list: @@ -336,7 +364,9 @@ async def select_line(self, message: Message): name = router.get("name") # 已选线路高亮 button_text = ( - f"🔵 {name}" if index == user_router_index else f"⚪ {name}" + f"🔵 {name}" + if index == user_router_index + else f"⚪ {name}" ) message_buttons.append( [ @@ -347,7 +377,9 @@ async def select_line(self, message: Message): ) keyboard = InlineKeyboardMarkup(message_buttons) - await self._reply_html(message, message_text, reply_markup=keyboard) + await self._reply_html( + message, message_text, reply_markup=keyboard + ) except Exception as e: await self._send_error(message, e, prefix="查询失败") @@ -371,7 +403,9 @@ async def group_member_change_handler(self, clent, message: Message): for new_member in message.new_chat_members: config.group_members[new_member.id] = new_member - async def handle_callback_query(self, client, callback_query: CallbackQuery): + async def handle_callback_query( + self, client, callback_query: CallbackQuery + ): """ 回调按钮事件统一处理,如切换线路。 """ @@ -384,7 +418,8 @@ async def handle_callback_query(self, client, callback_query: CallbackQuery): return selected_router = next( - (r for r in config.router_list if r["index"] == index), None + (r for r in config.router_list if r["index"] == index), + None, ) if not selected_router: await callback_query.answer("线路不存在") @@ -399,7 +434,9 @@ async def handle_callback_query(self, client, callback_query: CallbackQuery): "生效可能会有 30 秒延迟,请耐心等候。" ) except Exception as e: - await callback_query.answer(f"操作失败:{str(e)}", show_alert=True) + await callback_query.answer( + f"操作失败:{str(e)}", show_alert=True + ) logger.error(f"Callback query failed: {e}", exc_info=True) async def count(self, message: Message): @@ -410,7 +447,9 @@ async def count(self, message: Message): try: count_data = self.user_service.emby_count() if not count_data: - return await self._reply_html(message, "❌ 查询失败:无法获取数据") + return await self._reply_html( + message, "❌ 查询失败:无法获取数据" + ) await self._reply_html( message, @@ -435,10 +474,13 @@ async def register_until(self, message: Message, args: list[str]): time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") now = datetime.now() if time < now: - return await self._reply_html(message, "❌ 时间必须晚于当前时间") + return await self._reply_html( + message, "❌ 时间必须晚于当前时间" + ) await self.user_service.set_emby_config( - message.from_user.id, register_public_time=int(time.timestamp()) + message.from_user.id, + register_public_time=int(time.timestamp()), ) await self._reply_html( message, f"✅ 已开放注册,截止时间:{time_str}" @@ -513,13 +555,17 @@ async def c_info(client, message): await self.info(message) @self.bot_client.client.on_message( - filters.private & filters.command("use_code") & user_in_group_on_filter + filters.private + & filters.command("use_code") + & user_in_group_on_filter ) async def c_use_code(client, message): await self.use_code(message) @self.bot_client.client.on_message( - filters.private & filters.command("create") & user_in_group_on_filter + filters.private + & filters.command("create") + & user_in_group_on_filter ) async def c_create_user(client, message): await self.create_user(message) diff --git a/bot/filters.py b/bot/filters.py index a9319fb..5f439ac 100644 --- a/bot/filters.py +++ b/bot/filters.py @@ -14,7 +14,7 @@ async def user_in_group_on_filter(filter, client, update) -> bool: if config.group_members and telegram_id in config.group_members: logger.debug(f"User {telegram_id} is in group") return True - if config.channel_members and telegram_id in config.channel_members: + if config.channel_members and telegram_id in config.channel_members: logger.debug(f"User {telegram_id} is in channel") return True @@ -31,7 +31,10 @@ async def admin_user_on_filter(filter, client, update) -> bool: logger.debug(f"User {telegram_id} is an admin") return True except Exception as e: - logger.error(f"Error checking admin status for user {telegram_id}: {e}", exc_info=True) + logger.error( + f"Error checking admin status for user {telegram_id}: {e}", + exc_info=True, + ) return False logger.debug(f"User {telegram_id} is not an admin") @@ -47,13 +50,18 @@ async def emby_user_on_filter(filter, client, update) -> bool: logger.debug(f"User {telegram_id} is an Emby user") return True except Exception as e: - logger.error(f"Error checking Emby status for user {telegram_id}: {e}", exc_info=True) + logger.error( + f"Error checking Emby status for user {telegram_id}: {e}", + exc_info=True, + ) return False logger.debug(f"User {telegram_id} is not an Emby user") return False -user_in_group_on_filter = create(user_in_group_on_filter, 'user_in_group_on_filter') -admin_user_on_filter = create(admin_user_on_filter, 'admin_user_on_filter') -emby_user_on_filter = create(emby_user_on_filter, 'emby_user_on_filter') \ No newline at end of file +user_in_group_on_filter = create( + user_in_group_on_filter, "user_in_group_on_filter" +) +admin_user_on_filter = create(admin_user_on_filter, "admin_user_on_filter") +emby_user_on_filter = create(emby_user_on_filter, "emby_user_on_filter") diff --git a/bot/message_helper.py b/bot/message_helper.py index 54f5c57..d250f2b 100644 --- a/bot/message_helper.py +++ b/bot/message_helper.py @@ -24,19 +24,25 @@ async def get_user_telegram_id(client, message): # 直接提供 Telegram ID(纯数字) if telegram_str.isdigit(): telegram_id = int(telegram_str) - logger.debug(f"Telegram ID from arguments (numeric): {telegram_id}") + logger.debug( + f"Telegram ID from arguments (numeric): {telegram_id}" + ) # 使用 @username elif telegram_str.startswith("@"): telegram_username = telegram_str[1:] # 去掉 `@` - logger.debug(f"Telegram username from arguments: {telegram_username}") + logger.debug( + f"Telegram username from arguments: {telegram_username}" + ) # 通过用户名查找 ID if telegram_username: try: user = await client.get_users(telegram_username) telegram_id = user.id - logger.debug(f"Telegram ID resolved from username {telegram_username}: {telegram_id}") + logger.debug( + f"Telegram ID resolved from username {telegram_username}: {telegram_id}" + ) except UsernameNotOccupied: error_message = f"❌ 用户名 @{telegram_username} 不存在" logger.warning(f"Username not occupied: {telegram_username}") @@ -44,11 +50,16 @@ async def get_user_telegram_id(client, message): return None except PeerIdInvalid: error_message = f"❌ 无法获取用户 @{telegram_username} 的 ID" - logger.warning(f"Peer ID invalid for username: {telegram_username}") + logger.warning( + f"Peer ID invalid for username: {telegram_username}" + ) await message.reply(error_message) return None except Exception as e: - logger.error(f"Error getting user ID from username {telegram_username}: {e}", exc_info=True) + logger.error( + f"Error getting user ID from username {telegram_username}: {e}", + exc_info=True, + ) return None - return telegram_id \ No newline at end of file + return telegram_id diff --git a/bot/utils.py b/bot/utils.py index f5698da..8ac3804 100644 --- a/bot/utils.py +++ b/bot/utils.py @@ -7,11 +7,16 @@ def parse_iso8601(datetime_str: str): # 解析字符串为 datetime 对象 try: - dt = datetime.strptime(datetime_str[:26], "%Y-%m-%dT%H:%M:%S.%f") # 截取到微秒部分 + dt = datetime.strptime( + datetime_str[:26], "%Y-%m-%dT%H:%M:%S.%f" + ) # 截取到微秒部分 logger.debug(f"Parsed ISO8601 datetime string: {datetime_str}") return dt except Exception as e: - logger.error(f"Error parsing ISO8601 datetime string: {datetime_str}: {e}", exc_info=True) + logger.error( + f"Error parsing ISO8601 datetime string: {datetime_str}: {e}", + exc_info=True, + ) return None @@ -35,5 +40,7 @@ def parse_timestamp_to_normal_date(timestamp: int): logger.debug(f"Parsed timestamp: {timestamp}") return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: - logger.error(f"Error parsing timestamp {timestamp}: {e}", exc_info=True) - return None \ No newline at end of file + logger.error( + f"Error parsing timestamp {timestamp}: {e}", exc_info=True + ) + return None diff --git a/config.py b/config.py index 42f623c..3290b57 100644 --- a/config.py +++ b/config.py @@ -7,6 +7,7 @@ logger = logging.getLogger(__name__) + class Config: def __init__(self): self.timezone = os.getenv("TIMEZONE") @@ -14,7 +15,9 @@ def __init__(self): self.bot_token = os.getenv("BOT_TOKEN") self.api_id = os.getenv("API_ID") self.api_hash = os.getenv("API_HASH") - self.telegram_group_ids = list(map(int, os.getenv("TELEGRAM_GROUP_ID").split(","))) + self.telegram_group_ids = list( + map(int, os.getenv("TELEGRAM_GROUP_ID").split(",")) + ) self.emby_url = os.getenv("EMBY_URL") self.emby_api = os.getenv("EMBY_API_KEY") self.api_url = os.getenv("API_URL") @@ -34,4 +37,4 @@ def __init__(self): # 实例化并提供配置对象 config = Config() -logger.debug(f"Admin list: {config.admin_list}") \ No newline at end of file +logger.debug(f"Admin list: {config.admin_list}") diff --git a/core/emby_api.py b/core/emby_api.py index 2a64611..1b325c4 100644 --- a/core/emby_api.py +++ b/core/emby_api.py @@ -15,10 +15,12 @@ def __init__(self, emby_url: str, emby_api: str, timeout: int = 10): :param emby_api: Emby 服务器的 API Key :param timeout: 每次请求的超时时间,默认为 10 秒 """ - self.base_url: str = emby_url.rstrip('/') + self.base_url: str = emby_url.rstrip("/") self.api_key: str = emby_api self.timeout: int = timeout - logger.info(f"EmbyApi initialized with URL: {self.base_url}, timeout: {self.timeout}") + logger.info( + f"EmbyApi initialized with URL: {self.base_url}, timeout: {self.timeout}" + ) def _request(self, method: str, path: str, data=None, params=None): """ @@ -35,16 +37,28 @@ def _request(self, method: str, path: str, data=None, params=None): "Authorization": f"Token={self.api_key}", "X-Emby-Authorization": f"Token={self.api_key}", "User-Agent": "sadasd", - "Accept-Language": "zh-CN,zh-Hans;q=0.9", "Content-Type": "application/json", "Accept": "*/*" + "Accept-Language": "zh-CN,zh-Hans;q=0.9", + "Content-Type": "application/json", + "Accept": "*/*", } url = f"{self.base_url}{path}" - logger.debug(f"Making {method} request to {url} with params: {params}, data: {data}") + logger.debug( + f"Making {method} request to {url} with params: {params}, data: {data}" + ) try: - if method.upper() == 'GET': - response = requests.get(url, params=params, timeout=self.timeout, headers=headers) - elif method.upper() == 'POST': - response = requests.post(url, params=params, json=data, timeout=self.timeout, headers=headers) + if method.upper() == "GET": + response = requests.get( + url, params=params, timeout=self.timeout, headers=headers + ) + elif method.upper() == "POST": + response = requests.post( + url, + params=params, + json=data, + timeout=self.timeout, + headers=headers, + ) else: raise Exception(f"暂不支持的 HTTP 方法: {method}") @@ -54,16 +68,23 @@ def _request(self, method: str, path: str, data=None, params=None): raise Exception("请求 Emby 服务器超时,请稍后重试或检查网络连接。") except requests.exceptions.ConnectionError as e: # 连接异常 - logger.error(f"Failed to connect to Emby server: {e}", exc_info=True) + logger.error( + f"Failed to connect to Emby server: {e}", exc_info=True + ) raise Exception(f"无法连接到 Emby 服务器: {str(e)}") except requests.exceptions.RequestException as e: # 其他 requests 异常 - logger.error(f"An unknown error occurred while requesting Emby: {e}", exc_info=True) + logger.error( + f"An unknown error occurred while requesting Emby: {e}", + exc_info=True, + ) raise Exception(f"请求 Emby 时发生未知错误: {str(e)}") try: response.raise_for_status() - logger.debug(f"Request successful, status code: {response.status_code}") + logger.debug( + f"Request successful, status code: {response.status_code}" + ) except Exception as e: logger.error(f"Emby API request failed: {e}", exc_info=True) raise Exception(f"Emby API 请求失败") @@ -78,9 +99,12 @@ def get_user(self, emby_id: str): path = f"/emby/Users/{emby_id}" logger.info(f"Getting user with Emby ID: {emby_id}") try: - return self._request('GET', path) + return self._request("GET", path) except Exception as e: - logger.error(f"Failed to get user with Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to get user with Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def create_user(self, name: str): @@ -93,9 +117,11 @@ def create_user(self, name: str): data = {"Name": name, "HasPassword": False} logger.info(f"Creating user with name: {name}") try: - return self._request('POST', path, data=data) + return self._request("POST", path, data=data) except Exception as e: - logger.error(f"Failed to create user with name {name}: {e}", exc_info=True) + logger.error( + f"Failed to create user with name {name}: {e}", exc_info=True + ) raise def ban_user(self, emby_id: str): @@ -126,13 +152,16 @@ def ban_user(self, emby_id: str): "EnableMediaConversion": False, "EnableAllDevices": True, "AllowCameraUpload": False, - "SimultaneousStreamLimit": 0 + "SimultaneousStreamLimit": 0, } logger.info(f"Banning user with Emby ID: {emby_id}") try: return self.update_user_policy(emby_id, data) except Exception as e: - logger.error(f"Failed to ban user with Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to ban user with Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def set_default_policy(self, emby_id: str): @@ -163,13 +192,16 @@ def set_default_policy(self, emby_id: str): "EnableMediaConversion": False, "EnableAllDevices": True, "AllowCameraUpload": False, - "SimultaneousStreamLimit": 3 + "SimultaneousStreamLimit": 3, } logger.info(f"Setting default policy for user with Emby ID: {emby_id}") try: return self.update_user_policy(emby_id, data) except Exception as e: - logger.error(f"Failed to set default policy for user with Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to set default policy for user with Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def update_user_policy(self, emby_id: str, policy_data: dict): @@ -180,11 +212,16 @@ def update_user_policy(self, emby_id: str, policy_data: dict): :return: 成功返回更新后的用户信息 JSON,失败抛出异常 """ path = f"/emby/Users/{emby_id}/Policy" - logger.info(f"Updating user policy for Emby ID: {emby_id} with data: {policy_data}") + logger.info( + f"Updating user policy for Emby ID: {emby_id} with data: {policy_data}" + ) try: - return self._request('POST', path, data=policy_data) + return self._request("POST", path, data=policy_data) except Exception as e: - logger.error(f"Failed to update user policy for Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to update user policy for Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def reset_user_password(self, emby_id: str): @@ -197,9 +234,12 @@ def reset_user_password(self, emby_id: str): data = {"ResetPassword": True} logger.info(f"Resetting password for user with Emby ID: {emby_id}") try: - return self._request('POST', path, data=data) + return self._request("POST", path, data=data) except Exception as e: - logger.error(f"Failed to reset password for user with Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to reset password for user with Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def set_user_password(self, emby_id: str, new_pass: str): @@ -213,9 +253,12 @@ def set_user_password(self, emby_id: str, new_pass: str): data = {"ResetPassword": False, "CurrentPw": "", "NewPw": new_pass} logger.info(f"Setting password for user with Emby ID: {emby_id}") try: - return self._request('POST', path, data=data) + return self._request("POST", path, data=data) except Exception as e: - logger.error(f"Failed to set password for user with Emby ID {emby_id}: {e}", exc_info=True) + logger.error( + f"Failed to set password for user with Emby ID {emby_id}: {e}", + exc_info=True, + ) raise def check_emby_site(self) -> bool: @@ -226,7 +269,7 @@ def check_emby_site(self) -> bool: path = "/emby/System/Info" logger.info("Checking Emby site availability") try: - self._request('GET', path) + self._request("GET", path) return True except Exception as e: logger.warning(f"Emby site check failed: {e}", exc_info=True) @@ -242,7 +285,7 @@ def count(self): path = "/emby/Items/Counts" logger.info("Getting Emby item counts") try: - return self._request('GET', path) + return self._request("GET", path) except Exception as e: logger.error(f"Failed to get Emby item counts: {e}", exc_info=True) raise @@ -259,10 +302,12 @@ def __init__(self, api_url: str, api_key: str = "", timeout: int = 10): :param api_key: 路由服务使用的Token(如果需要鉴权) :param timeout: 请求超时,默认为10秒 """ - self.api_url = api_url.rstrip('/') + self.api_url = api_url.rstrip("/") self.api_key = api_key self.timeout = timeout - logger.info(f"EmbyRouterAPI initialized with URL: {self.api_url}, timeout: {self.timeout}") + logger.info( + f"EmbyRouterAPI initialized with URL: {self.api_url}, timeout: {self.timeout}" + ) def call_api(self, path: str): """ @@ -271,7 +316,9 @@ def call_api(self, path: str): :return: 成功时返回 JSON,失败抛出异常 """ url = f"{self.api_url}{path}" - headers = {'Authorization': f'Bearer {self.api_key}'} if self.api_key else {} + headers = ( + {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {} + ) logger.debug(f"Calling API at {url}") try: response = requests.get(url, headers=headers, timeout=self.timeout) @@ -281,10 +328,15 @@ def call_api(self, path: str): logger.error("Request to router service timed out", exc_info=True) raise Exception("请求路由服务超时,请稍后重试或检查网络连接。") except requests.exceptions.ConnectionError as e: - logger.error(f"Failed to connect to router service: {e}", exc_info=True) + logger.error( + f"Failed to connect to router service: {e}", exc_info=True + ) raise Exception(f"无法连接到路由服务: {str(e)}") except requests.exceptions.RequestException as e: - logger.error(f"An unknown error occurred while requesting router service: {e}", exc_info=True) + logger.error( + f"An unknown error occurred while requesting router service: {e}", + exc_info=True, + ) raise Exception(f"请求路由服务时发生错误: {str(e)}") def query_all_route(self): @@ -293,7 +345,7 @@ def query_all_route(self): """ logger.info("Querying all routes") try: - return self.call_api('/api/route') + return self.call_api("/api/route") except Exception as e: logger.error(f"Failed to query all routes: {e}", exc_info=True) raise @@ -304,18 +356,26 @@ def query_user_route(self, user_id: str): """ logger.info(f"Querying user route for user ID: {user_id}") try: - return self.call_api(f'/api/route/{user_id}') + return self.call_api(f"/api/route/{user_id}") except Exception as e: - logger.error(f"Failed to query user route for user ID {user_id}: {e}", exc_info=True) + logger.error( + f"Failed to query user route for user ID {user_id}: {e}", + exc_info=True, + ) raise def update_user_route(self, user_id: str, new_index: str): """ 更新用户当前所使用的线路。 """ - logger.info(f"Updating user route for user ID: {user_id} to index: {new_index}") + logger.info( + f"Updating user route for user ID: {user_id} to index: {new_index}" + ) try: - return self.call_api(f'/api/route/{user_id}/{new_index}') + return self.call_api(f"/api/route/{user_id}/{new_index}") except Exception as e: - logger.error(f"Failed to update user route for user ID {user_id} to index {new_index}: {e}", exc_info=True) - raise \ No newline at end of file + logger.error( + f"Failed to update user route for user ID {user_id} to index {new_index}: {e}", + exc_info=True, + ) + raise diff --git a/models/__init__.py b/models/__init__.py index 09b0f34..d128db6 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,4 +1,3 @@ from .user_model import User from .config_model import Config from .invite_code_model import InviteCode - diff --git a/models/config_model.py b/models/config_model.py index 7b89ad1..e2a7db4 100644 --- a/models/config_model.py +++ b/models/config_model.py @@ -6,15 +6,23 @@ logger = logging.getLogger(__name__) + class Config(BaseOrmTableWithTS): - __tablename__ = 'config' + __tablename__ = "config" - total_register_user: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - register_public_user: Mapped[int] = mapped_column(Integer, nullable=False, default=0) - register_public_time: Mapped[int] = mapped_column(BigInteger, nullable=True) + total_register_user: Mapped[int] = mapped_column( + Integer, nullable=False, default=0 + ) + register_public_user: Mapped[int] = mapped_column( + Integer, nullable=False, default=0 + ) + register_public_time: Mapped[int] = mapped_column( + BigInteger, nullable=True + ) class ConfigOrm(DBManager): orm_table = Config -logger.info("Config model initialized") \ No newline at end of file + +logger.info("Config model initialized") diff --git a/models/invite_code_model.py b/models/invite_code_model.py index 4860201..694a65e 100644 --- a/models/invite_code_model.py +++ b/models/invite_code_model.py @@ -7,31 +7,47 @@ logger = logging.getLogger(__name__) + class InviteCodeType(enum.Enum): - REGISTER = 'register' # 注册邀请码 - WHITELIST = 'whitelist' # 白名单邀请码 + REGISTER = "register" # 注册邀请码 + WHITELIST = "whitelist" # 白名单邀请码 def __str__(self): return self.value class InviteCode(BaseOrmTableWithTS): - __tablename__ = 'invite_code' - - code: Mapped[str] = mapped_column(String(50), index=True, unique=True, nullable=False) - telegram_id: Mapped[int] = mapped_column(BigInteger, index=True, nullable=False) - code_type: Mapped[InviteCodeType] = mapped_column(Enum(InviteCodeType), nullable=False) - is_used: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) - used_time: Mapped[int] = mapped_column(BigInteger, default=None, nullable=True) - used_user_id: Mapped[int] = mapped_column(BigInteger, default=None, nullable=True, index=True) + __tablename__ = "invite_code" + + code: Mapped[str] = mapped_column( + String(50), index=True, unique=True, nullable=False + ) + telegram_id: Mapped[int] = mapped_column( + BigInteger, index=True, nullable=False + ) + code_type: Mapped[InviteCodeType] = mapped_column( + Enum(InviteCodeType), nullable=False + ) + is_used: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False + ) + used_time: Mapped[int] = mapped_column( + BigInteger, default=None, nullable=True + ) + used_user_id: Mapped[int] = mapped_column( + BigInteger, default=None, nullable=True, index=True + ) def __repr__(self): - return (f"") + return ( + f"" + ) class InviteCodeOrm(DBManager): orm_table = InviteCode -logger.info("InviteCode model initialized") \ No newline at end of file + +logger.info("InviteCode model initialized") diff --git a/models/user_model.py b/models/user_model.py index 21cda65..92acbb9 100644 --- a/models/user_model.py +++ b/models/user_model.py @@ -8,16 +8,27 @@ logger = logging.getLogger(__name__) + class User(BaseOrmTableWithTS): - __tablename__ = 'user' + __tablename__ = "user" - telegram_id: Mapped[int] = mapped_column(BigInteger, index=True, unique=True, nullable=False) + telegram_id: Mapped[int] = mapped_column( + BigInteger, index=True, unique=True, nullable=False + ) telegram_name: Mapped[str] = mapped_column(String(100), nullable=True) emby_name: Mapped[str] = mapped_column(String(50), nullable=True) - emby_id: Mapped[str] = mapped_column(String(50), index=True, unique=True, nullable=True) - is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) - is_whitelist: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) - enable_register: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + emby_id: Mapped[str] = mapped_column( + String(50), index=True, unique=True, nullable=True + ) + is_admin: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False + ) + is_whitelist: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False + ) + enable_register: Mapped[bool] = mapped_column( + Boolean, default=False, nullable=False + ) ban_time: Mapped[int] = mapped_column(nullable=True) reason: Mapped[str] = mapped_column(String(100), nullable=True) @@ -56,7 +67,9 @@ def check_use_redeem_code(self) -> None: if self.emby_id is not None: raise Exception("该用户已拥有 Emby 账号,无法再次使用注册邀请码。") if self.enable_register: - raise Exception("该用户已经具备创建 Emby 账号的资格,无需再次使用邀请码。") + raise Exception( + "该用户已经具备创建 Emby 账号的资格,无需再次使用邀请码。" + ) def check_use_whitelist_code(self) -> None: """检查是否可使用白名单邀请码。""" @@ -104,4 +117,5 @@ def emby_ban_info(self) -> tuple[int, str]: class UserOrm(DBManager): orm_table = User -logger.info("User model initialized") \ No newline at end of file + +logger.info("User model initialized") diff --git a/ruff.toml b/ruff.toml index 17bd4bc..504147a 100644 --- a/ruff.toml +++ b/ruff.toml @@ -28,8 +28,8 @@ exclude = [ "site-packages", ] -# 格式化相关设置,保持与 Black 风格一致 -line-length = 88 +# 格式化相关设置,保持与 PEP8 风格一致 +line-length = 79 indent-width = 4 # 指定目标 Python 版本 @@ -39,8 +39,8 @@ target-version = "py312" # 启用部分 pycodestyle(E4、E7、E9)、Pyflakes(F),同时增加 bugbear(B)和 flake8-quotes(Q)规则 select = ["E4", "E7", "E9", "F", "B", "Q"] -# 如果您不希望行长检查导致过多提示,可选择忽略 E501 -ignore = ["E501"] +# 由于必须严格遵照 PEP8,此处移除了对 E501(行长检查)的忽略 +# ignore = ["E501"] # 默认允许自动修复所有启用的规则,但 bugbear(B)的规则风险较高,因此将其列入 unfixable,避免自动修改 fixable = ["ALL"] @@ -59,7 +59,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" docstring-quotes = "double" [format] -# 使用双引号(与 Black 一致) +# 使用双引号 quote-style = "double" # 使用空格缩进,不使用制表符 indent-style = "space" @@ -71,4 +71,4 @@ line-ending = "auto" # Docstring 内代码示例的自动格式化(目前处于实验阶段,如不需要可保持关闭) docstring-code-format = false # 当 docstring 格式化启用时,动态计算代码片段的行长 -docstring-code-line-length = "dynamic" \ No newline at end of file +docstring-code-line-length = "dynamic" diff --git a/services/user_service.py b/services/user_service.py index c460761..dddeda5 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -28,12 +28,16 @@ def __init__(self, emby_api: EmbyApi, emby_router_api: EmbyRouterAPI): @staticmethod async def get_or_create_user_by_telegram_id(telegram_id: int) -> User: """通过 telegram_id 从数据库获取用户,如果不存在则创建一个默认用户""" - user = await UserOrm().query_one(conds=[User.telegram_id == telegram_id]) + user = await UserOrm().query_one( + conds=[User.telegram_id == telegram_id] + ) if not user: default_user = User( telegram_id=telegram_id, is_admin=telegram_id in config.admin_list, - telegram_name=config.group_members.get(telegram_id, {}).username + telegram_name=config.group_members.get( + telegram_id, {} + ).username if config.group_members.get(telegram_id) else None, ) @@ -71,7 +75,9 @@ async def _emby_create_user( user = await self.get_or_create_user_by_telegram_id(telegram_id) emby_user = self.emby_api.create_user(username) if not emby_user or not emby_user.get("Id"): - raise Exception("在 Emby 系统中创建账号失败,请检查 Emby 服务是否正常。") + raise Exception( + "在 Emby 系统中创建账号失败,请检查 Emby 服务是否正常。" + ) emby_id = emby_user["Id"] user.emby_id = emby_id @@ -108,7 +114,9 @@ async def create_invite_code( code_objs = [ InviteCode( - code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER + code=code, + telegram_id=telegram_id, + code_type=InviteCodeType.REGISTER, ) for code in self.gen_register_code(count) ] @@ -124,7 +132,9 @@ async def create_whitelist_code( code_objs = [ InviteCode( - code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST + code=code, + telegram_id=telegram_id, + code_type=InviteCodeType.WHITELIST, ) for code in self.gen_whitelist_code(count) ] @@ -147,7 +157,9 @@ async def first_or_create_emby_config(self) -> Config: emby_config = await ConfigOrm().query_one(conds=[Config.id == 1]) if not emby_config: emby_config = Config( - register_public_user=0, register_public_time=0, total_register_user=0 + register_public_user=0, + register_public_time=0, + total_register_user=0, ) await ConfigOrm().add(emby_config) return emby_config @@ -158,7 +170,9 @@ async def emby_create_user( """创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user""" user = await self.get_or_create_user_by_telegram_id(telegram_id) if user.has_emby_account(): - raise Exception("该 Telegram 用户已经绑定过 Emby 账号,无法重复创建。") + raise Exception( + "该 Telegram 用户已经绑定过 Emby 账号,无法重复创建。" + ) emby_config = await self.first_or_create_emby_config() if not emby_config: @@ -168,18 +182,25 @@ async def emby_create_user( raise Exception("当前没有可用的注册权限或名额,创建账号被拒绝。") async with ConfigOrm().transaction() as session: - if not user.enable_register and emby_config.register_public_user > 0: + if ( + not user.enable_register + and emby_config.register_public_user > 0 + ): emby_config.register_public_user -= 1 emby_config.total_register_user += 1 - new_user = await self._emby_create_user(telegram_id, username, password) + new_user = await self._emby_create_user( + telegram_id, username, password + ) session.add(new_user) session.add(emby_config) await session.commit() return new_user - async def _check_register_permission(self, user: User, emby_config: Config) -> bool: + async def _check_register_permission( + self, user: User, emby_config: Config + ) -> bool: """检查用户是否有权限注册 Emby 账号""" enable_register = user.enable_register if not enable_register and emby_config.register_public_user > 0: @@ -207,7 +228,11 @@ async def redeem_code(self, telegram_id: int, code: str): # 使用事务块,并通过行锁防止并发问题 async with InviteCodeOrm().transaction() as session: # 构造 SELECT 语句,并加上 FOR UPDATE 行锁 - stmt = select(InviteCode).where(InviteCode.code == code).with_for_update() + stmt = ( + select(InviteCode) + .where(InviteCode.code == code) + .with_for_update() + ) result = await session.execute(stmt) valid_code = result.scalars().first() @@ -239,7 +264,9 @@ async def redeem_code(self, telegram_id: int, code: str): return valid_code - async def reset_password(self, telegram_id: int, password: str = "") -> bool: + async def reset_password( + self, telegram_id: int, password: str = "" + ) -> bool: """重置用户的 Emby 密码。""" user = await self.must_get_emby_user(telegram_id) try: @@ -251,7 +278,10 @@ async def reset_password(self, telegram_id: int, password: str = "") -> bool: return False async def emby_ban( - self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None + self, + telegram_id: int, + reason: str, + operator_telegram_id: Optional[int] = None, ) -> bool: """禁用用户""" if operator_telegram_id is not None: @@ -336,10 +366,14 @@ async def get_user_router(self, telegram_id: int) -> Dict: user = await self.must_get_emby_user(telegram_id) return self.emby_router_api.query_user_route(user.emby_id) - async def update_user_router(self, telegram_id: int, new_index: str) -> bool: + async def update_user_router( + self, telegram_id: int, new_index: str + ) -> bool: """更新用户线路信息""" user = await self.must_get_emby_user(telegram_id) - return self.emby_router_api.update_user_route(str(user.emby_id), str(new_index)) + return self.emby_router_api.update_user_route( + str(user.emby_id), str(new_index) + ) async def get_router_list(self, telegram_id: int) -> List[Dict]: """获取所有可用线路""" From 11595bb747a57dde1b6b7283f58855be8001c97e Mon Sep 17 00:00:00 2001 From: quby Date: Fri, 14 Feb 2025 22:19:26 +0800 Subject: [PATCH 5/6] chore(docs): remove readme.md to rename with uppercase filename --- readme.md | 80 ------------------------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 readme.md diff --git a/readme.md b/readme.md deleted file mode 100644 index 382e4d0..0000000 --- a/readme.md +++ /dev/null @@ -1,80 +0,0 @@ -## Emby Bot 项目 -本项目是一个基于 Pyrogram 的 Telegram Bot,用于管理 Emby 用户、发送邀请码、查询 Emby 资源等操作。同时集成了针对 Emby 路由服务的切换功能。项目主要通过 Python + SQLAlchemy + Pyrogram 实现。 - -### 功能概述 -#### 用户管理: -- 根据邀请码创建 Emby 用户,并分配默认密码、默认策略等。 -- 提供管理员命令禁用/解禁用户的 Emby 账号。 -- 可查看用户当前信息(白名单、管理员身份、禁用状态等)。 -#### 邀请码管理: -- 生成普通邀请码、白名单邀请码。 -- 使用邀请码后自动更新数据库和相关标识。 -#### 线路管理: -- 集成路由服务 API,允许用户在机器人对话中快速切换观影线路。 -#### 其他辅助功能: -- 查看当前 Emby 影片数量。 -- 限时或限量开放注册。 - -### 安装及运行 -```bash -git clone https://github.com/embyplus/embyBot -cp .env.example .env -vim .env - -python3 -m pip install -r requirements.txt -python3 app.py -``` - -### 配置环境变量 - -| 变量名 | 说明 | 示例值 | -|-------------------|---------------------------------------------------|----------------------------| -| TIMEZONE | 时区设置 | Asia/Shanghai | - | LOG_LEVEL | 日志级别,可选 DEBUG / INFO / WARNING / ERROR / CRITICAL | INFO | - | BOT_TOKEN | 你的 Telegram Bot 令牌 | 123456:ABC-DEF1234ghIkl... | - | API_ID | Telegram API ID(从 my.telegram.org 获取) | 1234567 | - | API_HASH | Telegram API Hash | abcdef1234567890ghijklmn | - | TELEGRAM_GROUP_ID | Bot 要监听或管理的群组 ID,支持多群可用逗号分隔 | -1001234567890 | - | EMBY_URL | Emby 服务器 URL | https://your-emby-url | - | EMBY_API_KEY | Emby 服务器 API Key | embyapikey123 | - | API_URL | 路由服务 API 基础地址 | https://your-router-api | - | API_KEY | 路由服务使用的鉴权 token,不需要则可留空 | routerapikey123 | - | DB_HOST | 数据库主机名或 IP | 127.0.0.1 | - | DB_PORT | 数据库端口 | 3306 | - | DB_USER | 数据库用户名 | root | - | DB_PASS | 数据库密码 | password | - | DB_NAME | 数据库名 | emby_bot_db | - | ADMIN_LIST | Bot 管理员的 Telegram ID 列表(用逗号分隔) | 123456789,987654321 | - -## 贡献指南 -欢迎贡献代码!为了确保项目的高质量和一致性,请遵循以下贡献规程: -### 提交规范 -- 提交信息必须符合 Angular 提交信息规范,格式如下: - - `type(scope): description` - - 例如:`feat(user): 添加用户注册功能` -### 与数据库、API 代码相关的更改 -- 请确保所有更改都经过严格的测试,并且不会引入新的错误。 -### 创建 Pull Request -- 请确保创建 Pull Request 前进行本地测试,确保通过所有 CI/CD 测试。 -### 支持的 Type 列表 - -| Type | 描述 | -|----------|-------------------------------------------------------------| -| feat | 添加新功能,比如新增用户注册、功能扩展等 | -| fix | 修复 bug 或错误,解决问题的修改 | -| docs | 文档相关修改,如更新说明文档、README、注释等 | -| style | 代码格式、标点、空格等修改,不影响代码逻辑运行 | -| refactor | 代码重构,调整代码结构而不改变功能 | -| perf | 性能优化修改,提升效率或降低资源消耗 | -| test | 添加或更新测试代码,保证项目稳定性 | -| chore | 杂项维护,如依赖更新、构建脚本修改,不涉及代码逻辑 | -| ci | 持续集成相关修改,如 GitHub Actions 工作流程优化 | - ---- -感谢您的贡献! - - -### Thanks -- [Pyrogram](https://docs.pyrogram.org/) - Telegram API for Python -- [SQLAlchemy](https://www.sqlalchemy.org/) - Python SQL Toolkit and Object-Relational Mapping -- [Emby服管理bot by小草](https://github.com/xiaocao666tzh/EmbyBot) \ No newline at end of file From d35d81b317468410c2cf261db66a8dd6969da9a2 Mon Sep 17 00:00:00 2001 From: quby Date: Fri, 14 Feb 2025 22:20:49 +0800 Subject: [PATCH 6/6] docs(readme): refactor and enhance README.md, move contribution guide to CONTRIBUTING.md --- CONTRIBUTING.md | 25 ++++++++++ README.md | 123 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 CONTRIBUTING.md create mode 100644 README.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..6ae3498 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,25 @@ +## 贡献指南 +欢迎贡献代码!为了确保项目的高质量和一致性,请遵循以下贡献规程: +### 提交规范 +- 提交信息必须符合 Angular 提交信息规范,格式如下: + - `type(scope): description` + - 例如:`feat(user): 添加用户注册功能` +### 与数据库、API 代码相关的更改 +- 请确保所有更改都经过严格的测试,并且不会引入新的错误。 +### 创建 Pull Request +- 请确保创建 Pull Request 前进行本地测试,确保通过所有 CI/CD 测试。 +### 支持的 Type 列表 + +| Type | 描述 | +|----------|-------------------------------------------------------------| +| feat | 添加新功能,比如新增用户注册、功能扩展等 | +| fix | 修复 bug 或错误,解决问题的修改 | +| docs | 文档相关修改,如更新说明文档、README、注释等 | +| style | 代码格式、标点、空格等修改,不影响代码逻辑运行 | +| refactor | 代码重构,调整代码结构而不改变功能 | +| perf | 性能优化修改,提升效率或降低资源消耗 | +| test | 添加或更新测试代码,保证项目稳定性 | +| chore | 杂项维护,如依赖更新、构建脚本修改,不涉及代码逻辑 | +| ci | 持续集成相关修改,如 GitHub Actions 工作流程优化 | +--- +感谢您的贡献! \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..67f4047 --- /dev/null +++ b/README.md @@ -0,0 +1,123 @@ +

+ embyBot logo +

+

embyBot

+

+ ⚡ The next-generation emby management bot +

+ +

+ +license + +python + +PEP 8 + + +codecov + +
+ +ruff + + +uv + + +pre-commit + + +pyright + + +ruff + +

+ +> [!IMPORTANT] +> ⭐️ Thanks **everyone** who has contributed to the project, it means a lot! + +## 📣 Introduction + +A Telegram bot for managing **Emby**, developed with **Pyrogram** and using **MySQL** as the database. + +## ✨ Features + + +#### User Management: + +- Create Emby users with invitation codes and assign default passwords and policies. +- Provide admin commands to disable/enable Emby accounts. +- View current user information (whitelist status, admin privileges, ban status, etc.). + +#### Invitation Code Management: + +- Generate standard and whitelist invitation codes. +- Automatically update the database and relevant status after using an invitation code. + +#### Route Management: + +- Integrate the routing service API to allow users to quickly switch playback routes through the bot. + +#### Other Features: + +- View the current number of Emby media items. +- Support limited-time or limited-quantity registration. + +## 🔰 Installation + +1. Clone the repository: + + ```bash + git clone https://github.com/embyplus/embyBot + ``` + +2. Copy and edit the environment variables file: + + ```bash + cp .env.example .env + vim .env + ``` + +3. Sync dependency environment + + ```bash + uv sync + ``` + +4. Run the bot: + + ```bash + uv run app.py + ``` + + +### ⛏ Code Quality + +### Unit Tests + +```shell +$ Writing... +``` + +### Integration Tests + +Continuous integration with [Sourcery-ai](https://sourcery.ai//). + +### Code Standards + +Use [Ruff](https://docs.astral.sh/ruff/) and [Codecov](https://codecov.io/) to maintain code quality. + +## Contribution Guide + +Please refer to the [Contribution Guide](./CONTRIBUTING.md). + +## Acknowledgments +- [Pyrogram](https://docs.pyrogram.org/) - Telegram API for Python +- [SQLAlchemy](https://www.sqlalchemy.org/) - Python SQL Toolkit and Object-Relational Mapping +- [Emby Management Bot by 小草](https://github.com/xiaocao666tzh/EmbyBot) + +## 🎡 Activities + +![Alt](https://repobeats.axiom.co/api/embed/079b23892e48f7b9e6be2f0cb2c66b2833eeffda.svg "Repobeats analytics image") \ No newline at end of file