From d35fc04ac9fec0820856864a26989768b6b558d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9B=9B=E6=96=B9=E9=9C=B8=E4=B8=BB?= <88601983+sifangbazhu@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:44:01 +0000 Subject: [PATCH] =?UTF-8?q?=E5=B7=B2=E7=BB=8F=E8=A7=A3=E6=9E=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81user=5Fservice=EF=BC=8C=E5=B9=B6=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=86command=E4=B8=AD=E4=BB=A3=E7=A0=81=E4=BD=BF=E5=85=B6?= =?UTF-8?q?=E9=80=82=E9=85=8D=E7=BB=93=E6=9E=84=E5=90=8E=E7=9A=84=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot/command/admin_command.py | 20 +- bot/command/event_command.py | 14 +- bot/command/user_command.py | 28 +-- services/__init__.py | 13 +- services/config_service.py | 86 ++++++++ services/emby_api.py | 102 +++++++++ services/invite_code_service.py | 134 ++++++++++++ services/user_router.py | 23 ++ services/user_service.py | 365 -------------------------------- 9 files changed, 391 insertions(+), 394 deletions(-) create mode 100644 services/config_service.py create mode 100644 services/emby_api.py create mode 100644 services/invite_code_service.py create mode 100644 services/user_router.py delete mode 100644 services/user_service.py diff --git a/bot/command/admin_command.py b/bot/command/admin_command.py index 54028f1..93b8c60 100644 --- a/bot/command/admin_command.py +++ b/bot/command/admin_command.py @@ -8,15 +8,17 @@ from bot.utils import with_parsed_args, reply_html, send_error, \ with_ensure_args from bot.utils.message_helper import get_user_telegram_id -from services import UserService +from services import ConfigService, InviteCodeService, ServiceApi logger = logging.getLogger(__name__) class AdminCommandHandler: - def __init__(self, bot_client: BotClient, user_service: UserService): + def __init__(self, bot_client: BotClient, config_service: ConfigService, invite_code_service: InviteCodeService, emby_api: ServiceApi): self.bot_client = bot_client - self.user_service = user_service + self.config_service = config_service + self.invite_code_service = invite_code_service + self.emby_api = emby_api self.code_to_message_id = {} logger.info("AdminCommandHandler initialized") @@ -36,7 +38,7 @@ async def new_code(self, message: Message, args: list[str]): num = min(num, 20) try: code_list = await ( - self.user_service + self.invite_code_service .create_invite_code(message.from_user.id, num) ) await self.send_code(code_list, message) @@ -59,7 +61,7 @@ async def new_whitelist_code(self, message: Message, args: list[str]): num = min(num, 20) try: - code_list = await self.user_service.create_whitelist_code( + code_list = await self.invite_code_service.create_whitelist_code( message.from_user.id, num) await self.send_code(code_list, message, whitelist=True) except Exception as e: @@ -105,7 +107,7 @@ async def ban_emby(self, message: Message, args: list[str]): telegram_id = await get_user_telegram_id(self.bot_client.client, message) try: - if await self.user_service.emby_ban(telegram_id, reason, + if await self.emby_api.emby_ban(telegram_id, reason, operator_id): await reply_html( message, @@ -124,7 +126,7 @@ async def unban_emby(self, message: Message): telegram_id = await get_user_telegram_id(self.bot_client.client, message) try: - if await self.user_service.emby_unban(telegram_id, operator_id): + if await self.emby_api.emby_unban(telegram_id, operator_id): await reply_html( message, f"✅ 已解禁用户 {telegram_id} 的Emby账号" @@ -148,7 +150,7 @@ async def register_until(self, message: Message, args: list[str]): if time < now: return await reply_html(message, "❌ 时间必须晚于当前时间") - await self.user_service.set_emby_config( + await self.config_service.set_emby_config( message.from_user.id, register_public_time=int(time.timestamp()) ) @@ -168,7 +170,7 @@ async def register_amount(self, message: Message, args: list[str]): """ try: amount = int(args[0]) - await self.user_service.set_emby_config( + await self.config_service.set_emby_config( message.from_user.id, register_public_user=amount ) diff --git a/bot/command/event_command.py b/bot/command/event_command.py index 04f8f1c..9183c4d 100644 --- a/bot/command/event_command.py +++ b/bot/command/event_command.py @@ -4,15 +4,17 @@ from bot import BotClient from config import config -from services import UserService +from services import UserRouter, InviteCodeService, ServiceApi logger = logging.getLogger(__name__) class EventHandler: - def __init__(self, bot_client: BotClient, user_service: UserService): + def __init__(self, bot_client: BotClient, user_router: UserRouter, invite_code_service: InviteCodeService, emby_api: ServiceApi): self.bot_client = bot_client - self.user_service = user_service + self.user_router = user_router + self.invite_code_service = invite_code_service + self.emby_api = emby_api self.code_to_message_id = {} logger.info("EventHandler initialized") @@ -36,7 +38,7 @@ async def handle_callback_query(self, _, await callback_query.answer("线路不存在") return - await self.user_service.update_user_router( + await self.user_router.update_user_router( callback_query.from_user.id, index) await callback_query.answer("线路已更新") await callback_query.message.edit( @@ -54,11 +56,11 @@ async def group_member_change_handler(self, _, message: Message): """ if message.left_chat_member: left_member_id = message.left_chat_member.id - left_member = await self.user_service.must_get_user(left_member_id) + left_member = await self.invite_code_service.must_get_user(left_member_id) if (left_member.has_emby_account() and not left_member.is_emby_baned() and not left_member.is_whitelist): - await self.user_service.emby_ban(message.left_chat_member.id, + await self.emby_api.emby_ban(message.left_chat_member.id, "用户已退出群组") config.group_members.pop(message.left_chat_member.id, None) if message.new_chat_members: diff --git a/bot/command/user_command.py b/bot/command/user_command.py index 847e9a1..feedb83 100644 --- a/bot/command/user_command.py +++ b/bot/command/user_command.py @@ -9,15 +9,17 @@ from bot.utils.message_helper import get_user_telegram_id from config import config from models.invite_code_model import InviteCodeType -from services import UserService +from services import InviteCodeService, UserRouter, ServiceApi logger = logging.getLogger(__name__) class UserCommandHandler: - def __init__(self, bot_client: BotClient, user_service: UserService): + def __init__(self, bot_client: BotClient, user_router: UserRouter, invite_code_service: InviteCodeService, emby_api: ServiceApi): self.bot_client = bot_client - self.user_service = user_service + self.user_router = user_router + self.invite_code_service = invite_code_service + self.emby_api = emby_api self.code_to_message_id = {} logger.info("UserCommandHandler initialized") @@ -27,7 +29,7 @@ async def count(self, message: Message): 查询服务器内片子数量 """ try: - count_data = self.user_service.emby_count() + count_data = self.emby_api.emby_count() if not count_data: return await reply_html(message, "❌ 查询失败:无法获取数据") @@ -56,7 +58,7 @@ async def info(self, message: Message): telegram_id = await get_user_telegram_id(self.bot_client.client, message) try: - user, emby_info = await self.user_service.emby_info(telegram_id) + user, emby_info = await self.emby_api.emby_info(telegram_id) last_active = ( parse_iso8601_to_normal_date(emby_info.get("LastActivityDate")) if emby_info.get("LastActivityDate") else "无") @@ -95,7 +97,7 @@ async def use_code(self, message: Message, args: list[str]): code = args[0] telegram_id = message.from_user.id try: - used_code = await self.user_service.redeem_code(telegram_id, code) + used_code = await self.invite_code_service.redeem_code(telegram_id, code) if not used_code: return await reply_html(message, "❌ 邀请码使用失败") # 根据类型给出不同的回复 @@ -128,13 +130,13 @@ async def select_line(self, message: Message): telegram_id = message.from_user.id router_list = ( config.router_list or - await self.user_service.get_router_list(telegram_id) + await self.user_router.get_router_list(telegram_id) ) # 缓存到 config 中,减少重复获取 if router_list and not config.router_list: config.router_list = router_list - user_router = await self.user_service.get_user_router(telegram_id) + user_router = await self.user_router.get_user_router(telegram_id) user_router_index = user_router.get('index', '') message_text = f"当前线路:{user_router_index}\n请选择线路:" message_buttons = [] @@ -167,9 +169,9 @@ async def create_user(self, message: Message, args: list[str]): """ emby_name = args[0] try: - default_password = self.user_service.gen_default_passwd() + default_password = self.invite_code_service.gen_default_passwd() user = await ( - self.user_service.emby_create_user( + self.emby_api.emby_create_user( message.from_user.id, emby_name, default_password ) ) @@ -187,10 +189,10 @@ async def reset_emby_password(self, message: Message): """ /reset_emby_password """ - default_password = self.user_service.gen_default_passwd() + default_password = self.invite_code_service.gen_default_passwd() try: if await ( - self.user_service + self.emby_api .reset_password( message.from_user.id, default_password ) @@ -219,7 +221,7 @@ async def help_command(self, message: Message): "/count - 查看服务器内影片数量\n" "/help - 显示本帮助\n" ) - if await self.user_service.is_admin(message.from_user.id): + if await self.invite_code_service.is_admin(message.from_user.id): help_message += ( "\n管理命令:\n" "/new_code [数量] - 创建新的普通邀请码\n" diff --git a/services/__init__.py b/services/__init__.py index 254962a..7547bbd 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1 +1,12 @@ -from .user_service import UserService +from .invite_code_service import InviteCodeService +from .config_service import ConfigService +from .user_router import UserRouter +from .emby_api import ServiceApi + + +__all__ = [ + "InviteCodeService", + "UserRouter", + "ConfigService", + "ServiceApi" +] \ No newline at end of file diff --git a/services/config_service.py b/services/config_service.py new file mode 100644 index 0000000..c969834 --- /dev/null +++ b/services/config_service.py @@ -0,0 +1,86 @@ +from models.config_model import ConfigOrm +from models import User, Config +from datetime import datetime +from typing import Optional + +class ConfigService: + """Emby配置相关""" + + async def first_or_create_emby_config(self) -> Config: + """获取或创建 Emby 配置。""" + emby_config = await ConfigOrm().query_one(conds=[Config.id == 1]) + if not emby_config: + emby_config = Config( + register_public_user=0, + register_public_time=0, + total_register_user=0 + ) + await ConfigOrm().add(emby_config) + return emby_config + + async def emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + """创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user""" + user = await self.get_or_create_user_by_telegram_id(telegram_id) + if user.has_emby_account(): + raise Exception("该 Telegram 用户已经绑定过 Emby 账号,无法重复创建。") + + emby_config = await self.first_or_create_emby_config() + if not emby_config: + raise Exception("未找到 Emby 配置,无法创建账号。") + + if not await self._check_register_permission(user, emby_config): + raise Exception("当前没有可用的注册权限或名额,创建账号被拒绝。") + + async with ConfigOrm().transaction() as session: + if not user.enable_register and emby_config.register_public_user > 0: + emby_config.register_public_user -= 1 + + emby_config.total_register_user += 1 + new_user = await self._emby_create_user(telegram_id, username, password) + + session.add(new_user) + session.add(emby_config) + await session.commit() + return new_user + + async def _check_register_permission(self, user: User, emby_config: Config) -> bool: + """检查用户是否有权限注册 Emby 账号""" + enable_register = user.enable_register + if not enable_register and emby_config.register_public_user > 0: + enable_register = True + if ( + not enable_register + and emby_config.register_public_time > 0 + and datetime.now().timestamp() < emby_config.register_public_time + ): + enable_register = True + if 0 < emby_config.register_public_time < datetime.now().timestamp(): + await ConfigOrm().update( + values={'register_public_time': 0}, + conds=[Config.id == 1] + ) + return enable_register + + async def set_emby_config(self, telegram_id: int, register_public_user: Optional[int] = None, + register_public_time: Optional[int] = None) -> Config: + """设置 Emby 注册相关配置,如公共注册名额和公共注册截止时间""" + user = await self.must_get_user(telegram_id) + user.check_set_emby_config() + + emby_config = await self.first_or_create_emby_config() + if not emby_config: + raise Exception("未找到全局 Emby 配置,无法设置。") + + if register_public_user is not None: + emby_config.register_public_user = register_public_user + if register_public_time is not None: + emby_config.register_public_time = register_public_time + + await ConfigOrm().update( + values={ + 'register_public_user': emby_config.register_public_user, + 'register_public_time': emby_config.register_public_time + }, + conds=[Config.id == 1] + ) + return emby_config \ No newline at end of file diff --git a/services/emby_api.py b/services/emby_api.py new file mode 100644 index 0000000..ed59593 --- /dev/null +++ b/services/emby_api.py @@ -0,0 +1,102 @@ +from typing import Optional, Dict, Tuple +from models.user_model import UserOrm +from core.emby_api import EmbyApi +from datetime import datetime +from models import User +import logging + +logger = logging.getLogger(__name__) + +class ServiceApi: + """EmbyAPI相关操作""" + + def __init__(self, emby_api: EmbyApi): + self.emby_api = emby_api + + async def _emby_create_user(self, telegram_id: int, username: str, password: str) -> User: + """内部使用:真正调用 Emby API 创建用户,并设置初始密码""" + user = await self.get_or_create_user_by_telegram_id(telegram_id) + emby_user = self.emby_api.create_user(username) + if not emby_user or not emby_user.get("Id"): + raise Exception("在 Emby 系统中创建账号失败,请检查 Emby 服务是否正常。") + + emby_id = emby_user["Id"] + user.emby_id = emby_id + user.emby_name = username + user.enable_register = False + + # 设置初始密码 & 默认Policy + self.emby_api.set_user_password(emby_id, password) + self.emby_api.set_default_policy(emby_id) + return user + + async def emby_info(self, telegram_id: int) -> Tuple[User, Dict]: + """获取当前用户在 Emby 的信息""" + user = await self.must_get_user(telegram_id) + if not user.has_emby_account(): + raise Exception("该用户尚未绑定 Emby 账号。") + emby_user = self.emby_api.get_user(str(user.emby_id)) + if not emby_user: + raise Exception("从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。") + return user, emby_user + + async def reset_password(self, telegram_id: int, password: str = '') -> bool: + """重置用户的 Emby 密码。""" + user = await self.must_get_emby_user(telegram_id) + try: + self.emby_api.reset_user_password(user.emby_id) + self.emby_api.set_user_password(user.emby_id, password) + return True + except Exception as e: + logger.error(f"重置密码失败: {e}") + return False + + async def emby_ban(self, telegram_id: int, reason: str, operator_telegram_id: Optional[int] = None) -> bool: + """禁用用户""" + if operator_telegram_id is not None: + admin_user = await self.must_get_user(operator_telegram_id) + if not admin_user.is_admin: + raise Exception("您没有管理员权限,无法执行禁用操作。") + + user = await self.must_get_user(telegram_id) + user.check_emby_ban() + + try: + self.emby_api.ban_user(str(user.emby_id)) + user.ban_time = int(datetime.now().timestamp()) + user.reason = reason + await UserOrm().update( + {"ban_time": user.ban_time, "reason": reason}, + conds=[User.id == user.id] + ) + return True + except Exception as e: + logger.error(f"禁用用户失败: {e}") + return False + + async def emby_unban(self, telegram_id: int, operator_telegram_id: Optional[int] = None) -> bool: + """解禁用户""" + if operator_telegram_id is not None: + admin_user = await self.must_get_user(operator_telegram_id) + if not admin_user.is_admin: + raise Exception("您没有管理员权限,无法执行解禁操作。") + + user = await self.must_get_user(telegram_id) + user.check_emby_unban() + + try: + self.emby_api.set_default_policy(str(user.emby_id)) + user.ban_time = 0 + user.reason = "" + await UserOrm().update( + {"ban_time": 0, "reason": None}, + conds=[User.id == user.id] + ) + return True + except Exception as e: + logger.error(f"解禁用户失败: {e}") + return False + + def emby_count(self) -> Dict: + """从 Emby API 获取当前影片数量统计""" + return self.emby_api.count() \ No newline at end of file diff --git a/services/invite_code_service.py b/services/invite_code_service.py new file mode 100644 index 0000000..dfbb7b4 --- /dev/null +++ b/services/invite_code_service.py @@ -0,0 +1,134 @@ +from models import InviteCode +from models.invite_code_model import InviteCodeOrm, InviteCodeType +from models.user_model import User, UserOrm +from typing import List +from sqlalchemy import select +from datetime import datetime +from config import config +from random import sample +import shortuuid +import string +import re + + + +class InviteCodeService: + """邀请码服务""" + + @staticmethod + async def get_or_create_user_by_telegram_id(telegram_id: int) -> User: + """通过 telegram_id 从数据库获取用户,如果不存在则创建一个默认用户""" + user = await UserOrm().query_one(conds=[User.telegram_id == telegram_id]) + if not user: + default_user = User( + telegram_id=telegram_id, + is_admin=telegram_id in config.admin_list, + telegram_name=config.group_members.get(telegram_id, {}).get('username'), + ) + user_id = await UserOrm().add(default_user) + user = default_user + user.id = user_id + return user + + async def is_admin(self,telegram_id: int) -> bool: + """判断指定的 Telegram 用户是否为管理员""" + user = await self.get_or_create_user_by_telegram_id(telegram_id) + return user and user.is_admin + + async def must_get_user(self, telegram_id: int) -> User: + """获取指定用户信息,不存在则抛出异常""" + user = await self.get_or_create_user_by_telegram_id(telegram_id) + if user is None: + raise Exception("未找到该用户的信息。") + return user + + async def must_get_emby_user(self, telegram_id: int) -> User: + """确保用户存在且已创建 Emby 账号,若不存在则抛出异常""" + user = await self.must_get_user(telegram_id) + if user.emby_id is None: + raise Exception("该用户尚未绑定 Emby 账号,无法执行此操作。") + if user.ban_time is not None and user.ban_time > 0: + raise Exception("该用户的 Emby 账号已被禁用,无法执行此操作。") + return user + + @staticmethod + def gen_default_passwd() -> str: + """生成默认密码:随机6位的字母数字组合""" + return ''.join(sample(string.ascii_letters + string.digits, 6)) + + @staticmethod + def gen_register_code(num: int) -> List[str]: + """批量生成普通邀请码""" + return [f'epr-{str(shortuuid.uuid())}' for _ in range(num)] + + @staticmethod + def gen_whitelist_code(num: int) -> List[str]: + """批量生成白名单邀请码""" + return [f'epw-{str(shortuuid.uuid())}' for _ in range(num)] + + async def create_invite_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + """创建普通邀请码,需检测用户是否有权限""" + user = await self.must_get_user(telegram_id) + if not user.check_create_invite_code(): + raise Exception("您没有权限生成普通邀请码。") + + code_objs = [ + InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.REGISTER) + for code in self.gen_register_code(count) + ] + return await InviteCodeOrm().bulk_add(code_objs) + + async def create_whitelist_code(self, telegram_id: int, count: int = 1) -> List[InviteCode]: + """创建白名单邀请码,需检测用户是否有权限""" + user = await self.must_get_user(telegram_id) + if not user.check_create_whitelist_code(): + raise Exception("您没有权限生成白名单邀请码。") + + code_objs = [ + InviteCode(code=code, telegram_id=telegram_id, code_type=InviteCodeType.WHITELIST) + for code in self.gen_whitelist_code(count) + ] + return await InviteCodeOrm().bulk_add(code_objs) + + async def redeem_code(self, telegram_id: int, code: str): + """使用邀请码,分为普通注册邀请码和白名单邀请码""" + pattern = re.compile(r'^(epr|epw)-[A-Za-z0-9]+$') + if not pattern.match(code): + raise Exception("邀请码格式不正确。") + + user = await self.must_get_user(telegram_id) + + # 使用事务块,并通过行锁防止并发问题 + async with InviteCodeOrm().transaction() as session: + # 构造 SELECT 语句,并加上 FOR UPDATE 行锁 + stmt = select(InviteCode).where(InviteCode.code == code).with_for_update() + result = await session.execute(stmt) + valid_code = result.scalars().first() + + if not valid_code or valid_code.is_used: + raise Exception("该邀请码无效或已被使用。") + + # 根据邀请码类型执行不同的业务逻辑校验 + if valid_code.code_type == InviteCodeType.REGISTER: + user.check_use_redeem_code() + elif valid_code.code_type == InviteCodeType.WHITELIST: + user.check_use_whitelist_code() + if user.is_emby_baned(): + await self.emby_unban(telegram_id) + + # 标记邀请码已使用,并记录使用时间和使用者 + valid_code.is_used = True + valid_code.used_time = datetime.now().timestamp() + valid_code.used_user_id = telegram_id + + # 根据邀请码类型更新用户状态 + if valid_code.code_type == InviteCodeType.REGISTER: + user.enable_register = True + elif valid_code.code_type == InviteCodeType.WHITELIST: + user.is_whitelist = True + + session.add(valid_code) + session.add(user) + await session.commit() + + return valid_code \ No newline at end of file diff --git a/services/user_router.py b/services/user_router.py new file mode 100644 index 0000000..0461bfb --- /dev/null +++ b/services/user_router.py @@ -0,0 +1,23 @@ +from core.emby_api import EmbyRouterAPI +from typing import List, Dict + +class UserRouter: + """用户线路相关操作""" + + def __init__(self, emby_router_api: EmbyRouterAPI): + self.emby_router_api = emby_router_api + + async def get_user_router(self, telegram_id: int) -> Dict: + """获取用户的线路信息""" + user = await self.must_get_emby_user(telegram_id) + return self.emby_router_api.query_user_route(user.emby_id) + + async def update_user_router(self, telegram_id: int, new_index: str) -> bool: + """更新用户线路信息""" + user = await self.must_get_emby_user(telegram_id) + return self.emby_router_api.update_user_route(str(user.emby_id), str(new_index)) + + async def get_router_list(self, telegram_id: int) -> List[Dict]: + """获取所有可用线路""" + await self.must_get_emby_user(telegram_id) + return self.emby_router_api.query_all_route() \ No newline at end of file diff --git a/services/user_service.py b/services/user_service.py deleted file mode 100644 index dbb0dbc..0000000 --- a/services/user_service.py +++ /dev/null @@ -1,365 +0,0 @@ -import logging -import re -import string -from datetime import datetime -from random import sample -from typing import Optional, List, Dict, Tuple - -import shortuuid -from sqlalchemy import select - -from config import config -from core.emby_api import EmbyApi, EmbyRouterAPI -from models import User, Config, InviteCode -from models.config_model import ConfigOrm -from models.invite_code_model import InviteCodeOrm, InviteCodeType -from models.user_model import UserOrm - -logger = logging.getLogger(__name__) - - -async def first_or_create_emby_config() -> Config: - """获取或创建 Emby 配置。""" - emby_config = await ConfigOrm().query_one(conds=[Config.id == 1]) - if not emby_config: - emby_config = Config( - register_public_user=0, register_public_time=0, - total_register_user=0 - ) - await ConfigOrm().add(emby_config) - return emby_config - - -async def _check_register_permission(user: User, - emby_config: Config) -> bool: - """检查用户是否有权限注册 Emby 账号""" - enable_register = user.enable_register - if not enable_register and emby_config.register_public_user > 0: - enable_register = True - if ( - not enable_register - and emby_config.register_public_time > 0 - and (datetime.now().timestamp() - < emby_config.register_public_time) - ): - enable_register = True - if 0 < emby_config.register_public_time < datetime.now().timestamp(): - await ConfigOrm().update( - values={"register_public_time": 0}, conds=[Config.id == 1] - ) - return enable_register - - -class UserService: - """用户与 Emby 相关的业务逻辑层""" - - def __init__(self, emby_api: EmbyApi, emby_router_api: EmbyRouterAPI): - self.emby_api = emby_api - self.emby_router_api = emby_router_api - - @staticmethod - async def get_or_create_user_by_telegram_id(telegram_id: int) -> User: - """通过 telegram_id 从数据库获取用户,如果不存在则创建一个默认用户""" - user = await UserOrm().query_one( - conds=[User.telegram_id == telegram_id]) - if not user: - default_user = User( - telegram_id=telegram_id, - is_admin=telegram_id in config.admin_list, - telegram_name=config.group_members.get(telegram_id, - {}).username - if config.group_members.get(telegram_id) - else None, - ) - user_id = await UserOrm().add(default_user) - user = default_user - user.id = user_id - return user - - @staticmethod - async def is_admin(telegram_id: int) -> bool: - """判断指定的 Telegram 用户是否为管理员""" - user = await UserService.get_or_create_user_by_telegram_id(telegram_id) - return user and user.is_admin - - async def must_get_user(self, telegram_id: int) -> User: - """获取指定用户信息,不存在则抛出异常""" - user = await self.get_or_create_user_by_telegram_id(telegram_id) - if user is None: - raise Exception("未找到该用户的信息。") - return user - - async def must_get_emby_user(self, telegram_id: int) -> User: - """确保用户存在且已创建 Emby 账号,若不存在则抛出异常""" - user = await self.must_get_user(telegram_id) - if user.emby_id is None: - raise Exception("该用户尚未绑定 Emby 账号,无法执行此操作。") - if user.ban_time is not None and user.ban_time > 0: - raise Exception("该用户的 Emby 账号已被禁用,无法执行此操作。") - return user - - async def _emby_create_user( - self, telegram_id: int, username: str, password: str - ) -> User: - """内部使用:真正调用 Emby API 创建用户,并设置初始密码""" - user = await self.get_or_create_user_by_telegram_id(telegram_id) - emby_user = self.emby_api.create_user(username) - if not emby_user or not emby_user.get("Id"): - raise Exception( - "在 Emby 系统中创建账号失败,请检查 Emby 服务是否正常。") - - emby_id = emby_user["Id"] - user.emby_id = emby_id - user.emby_name = username - user.enable_register = False - - # 设置初始密码 & 默认Policy - self.emby_api.set_user_password(emby_id, password) - self.emby_api.set_default_policy(emby_id) - return user - - @staticmethod - def gen_default_passwd() -> str: - """生成默认密码:随机6位的字母数字组合""" - return "".join(sample(string.ascii_letters + string.digits, 6)) - - @staticmethod - def gen_register_code(num: int) -> List[str]: - """批量生成普通邀请码""" - return [f"epr-{str(shortuuid.uuid())}" for _ in range(num)] - - @staticmethod - def gen_whitelist_code(num: int) -> List[str]: - """批量生成白名单邀请码""" - return [f"epw-{str(shortuuid.uuid())}" for _ in range(num)] - - async def create_invite_code( - self, telegram_id: int, count: int = 1 - ) -> List[InviteCode]: - """创建普通邀请码,需检测用户是否有权限""" - user = await self.must_get_user(telegram_id) - if not user.check_create_invite_code(): - raise Exception("您没有权限生成普通邀请码。") - - code_objs = [ - InviteCode( - code=code, telegram_id=telegram_id, - code_type=InviteCodeType.REGISTER - ) - for code in self.gen_register_code(count) - ] - return await InviteCodeOrm().bulk_add(code_objs) - - async def create_whitelist_code( - self, telegram_id: int, count: int = 1 - ) -> List[InviteCode]: - """创建白名单邀请码,需检测用户是否有权限""" - user = await self.must_get_user(telegram_id) - if not user.check_create_whitelist_code(): - raise Exception("您没有权限生成白名单邀请码。") - - code_objs = [ - InviteCode( - code=code, telegram_id=telegram_id, - code_type=InviteCodeType.WHITELIST - ) - for code in self.gen_whitelist_code(count) - ] - return await InviteCodeOrm().bulk_add(code_objs) - - async def emby_info(self, telegram_id: int) -> Tuple[User, Dict]: - """获取当前用户在 Emby 的信息""" - user = await self.must_get_user(telegram_id) - if not user.has_emby_account(): - raise Exception("该用户尚未绑定 Emby 账号。") - emby_user = self.emby_api.get_user(str(user.emby_id)) - if not emby_user: - raise Exception( - "从 Emby 服务器获取用户信息失败,请检查 Emby 服务是否正常。" - ) - return user, emby_user - - async def emby_create_user( - self, telegram_id: int, username: str, password: str - ) -> User: - """创建 Emby 用户(外部调用入口),先判断各种配置是否允许注册,然后调用内部的 _emby_create_user""" - user = await self.get_or_create_user_by_telegram_id(telegram_id) - if user.has_emby_account(): - raise Exception( - "该 Telegram 用户已经绑定过 Emby 账号,无法重复创建。") - - emby_config = await first_or_create_emby_config() - if not emby_config: - raise Exception("未找到 Emby 配置,无法创建账号。") - - if not await _check_register_permission(user, emby_config): - raise Exception("当前没有可用的注册权限或名额,创建账号被拒绝。") - - async with ConfigOrm().transaction() as session: - if (not user.enable_register - and emby_config.register_public_user > 0): - emby_config.register_public_user -= 1 - - emby_config.total_register_user += 1 - new_user = await self._emby_create_user(telegram_id, username, - password) - - session.add(new_user) - session.add(emby_config) - await session.commit() - return new_user - - async def redeem_code(self, telegram_id: int, code: str): - """使用邀请码,分为普通注册邀请码和白名单邀请码""" - pattern = re.compile(r"^(epr|epw)-[A-Za-z0-9]+$") - if not pattern.match(code): - raise Exception("邀请码格式不正确。") - - user = await self.must_get_user(telegram_id) - - # 使用事务块,并通过行锁防止并发问题 - async with InviteCodeOrm().transaction() as session: - # 构造 SELECT 语句,并加上 FOR UPDATE 行锁 - stmt = select(InviteCode).where( - InviteCode.code == code).with_for_update() - result = await session.execute(stmt) - valid_code = result.scalars().first() - - if not valid_code or valid_code.is_used: - raise Exception("该邀请码无效或已被使用。") - - # 根据邀请码类型执行不同的业务逻辑校验 - if valid_code.code_type == InviteCodeType.REGISTER: - user.check_use_redeem_code() - elif valid_code.code_type == InviteCodeType.WHITELIST: - user.check_use_whitelist_code() - if user.is_emby_baned(): - await self.emby_unban(telegram_id) - - # 标记邀请码已使用,并记录使用时间和使用者 - valid_code.is_used = True - valid_code.used_time = datetime.now().timestamp() - valid_code.used_user_id = telegram_id - - # 根据邀请码类型更新用户状态 - if valid_code.code_type == InviteCodeType.REGISTER: - user.enable_register = True - elif valid_code.code_type == InviteCodeType.WHITELIST: - user.is_whitelist = True - - session.add(valid_code) - session.add(user) - await session.commit() - - return valid_code - - async def reset_password(self, telegram_id: int, - password: str = "") -> bool: - """重置用户的 Emby 密码。""" - user = await self.must_get_emby_user(telegram_id) - try: - self.emby_api.reset_user_password(user.emby_id) - self.emby_api.set_user_password(user.emby_id, password) - return True - except Exception as e: - logger.error(f"重置密码失败: {e}") - return False - - async def emby_ban( - self, telegram_id: int, reason: str, - operator_telegram_id: Optional[int] = None - ) -> bool: - """禁用用户""" - if operator_telegram_id is not None: - admin_user = await self.must_get_user(operator_telegram_id) - if not admin_user.is_admin: - raise Exception("您没有管理员权限,无法执行禁用操作。") - - user = await self.must_get_user(telegram_id) - user.check_emby_ban() - - try: - self.emby_api.ban_user(str(user.emby_id)) - user.ban_time = int(datetime.now().timestamp()) - user.reason = reason - await UserOrm().update( - {"ban_time": user.ban_time, "reason": reason}, - conds=[User.id == user.id], - ) - return True - except Exception as e: - logger.error(f"禁用用户失败: {e}") - return False - - async def emby_unban( - self, telegram_id: int, operator_telegram_id: Optional[int] = None - ) -> bool: - """解禁用户""" - if operator_telegram_id is not None: - admin_user = await self.must_get_user(operator_telegram_id) - if not admin_user.is_admin: - raise Exception("您没有管理员权限,无法执行解禁操作。") - - user = await self.must_get_user(telegram_id) - user.check_emby_unban() - - try: - self.emby_api.set_default_policy(str(user.emby_id)) - user.ban_time = 0 - user.reason = "" - await UserOrm().update( - {"ban_time": 0, "reason": None}, conds=[User.id == user.id] - ) - return True - except Exception as e: - logger.error(f"解禁用户失败: {e}") - return False - - async def set_emby_config( - self, - telegram_id: int, - register_public_user: Optional[int] = None, - register_public_time: Optional[int] = None, - ) -> Config: - """设置 Emby 注册相关配置,如公共注册名额和公共注册截止时间""" - user = await self.must_get_user(telegram_id) - user.check_set_emby_config() - - emby_config = await first_or_create_emby_config() - if not emby_config: - raise Exception("未找到全局 Emby 配置,无法设置。") - - if register_public_user is not None: - emby_config.register_public_user = register_public_user - if register_public_time is not None: - emby_config.register_public_time = register_public_time - - await ConfigOrm().update( - values={ - "register_public_user": emby_config.register_public_user, - "register_public_time": emby_config.register_public_time, - }, - conds=[Config.id == 1], - ) - return emby_config - - def emby_count(self) -> Dict: - """从 Emby API 获取当前影片数量统计""" - return self.emby_api.count() - - async def get_user_router(self, telegram_id: int) -> Dict: - """获取用户的线路信息""" - user = await self.must_get_emby_user(telegram_id) - return self.emby_router_api.query_user_route(user.emby_id) - - async def update_user_router(self, telegram_id: int, - new_index: str) -> bool: - """更新用户线路信息""" - user = await self.must_get_emby_user(telegram_id) - return self.emby_router_api.update_user_route(str(user.emby_id), - str(new_index)) - - async def get_router_list(self, telegram_id: int) -> List[Dict]: - """获取所有可用线路""" - await self.must_get_emby_user(telegram_id) - return self.emby_router_api.query_all_route()