Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 66 additions & 78 deletions core/emby_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import requests
import aiohttp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 考虑将网络调用和错误处理逻辑提取到共享的帮助函数中,以减少代码重复。

考虑将网络调用和错误处理逻辑提取到共享的帮助函数中。例如,对于异步请求,您可以创建一个如下的助手:

```python
async def _fetch(session, method, url, *, headers=None, params=None, json=None):
    try:
        async with session.request(method, url, headers=headers, params=params, json=json) as response:
            if response.status != 200:
                raise Exception(f"Request failed with status code {response.status}")
            return await response.json()
    except aiohttp.ClientError as e:
        logger.error(f"Client error during {method} {url}: {e}", exc_info=True)
        raise Exception(f"请求路由服务时发生错误: {str(e)}")
    except asyncio.TimeoutError:
        logger.error(f"Request to {url} timed out", exc_info=True)
        raise Exception("请求路由服务超时,请稍后重试或检查网络连接。")

然后更新您的 call_api(和类似的方法)以使用它:

async def call_api(self, path: str):
    url = f"{self.api_url}{path}"
    headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
    logger.debug(f"Calling API at {url}")
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
        return await _fetch(session, "GET", url, headers=headers)

您可以类似地为您的同步请求创建带有 requests 的助手,以减少重复。这保持了功能完整,同时减少了重复的嵌套和 try/except 块。


<details>
<summary>Original comment in English</summary>

**issue (complexity):** Consider extracting the network call and error-handling logic into a shared helper function to reduce code duplication.

```markdown
Consider extracting the network call and error‐handling logic into a shared helper. For example, for asynchronous requests you can create a helper like:

```python
async def _fetch(session, method, url, *, headers=None, params=None, json=None):
    try:
        async with session.request(method, url, headers=headers, params=params, json=json) as response:
            if response.status != 200:
                raise Exception(f"Request failed with status code {response.status}")
            return await response.json()
    except aiohttp.ClientError as e:
        logger.error(f"Client error during {method} {url}: {e}", exc_info=True)
        raise Exception(f"请求路由服务时发生错误: {str(e)}")
    except asyncio.TimeoutError:
        logger.error(f"Request to {url} timed out", exc_info=True)
        raise Exception("请求路由服务超时,请稍后重试或检查网络连接。")

Then update your call_api (and similar methods) to use it:

async def call_api(self, path: str):
    url = f"{self.api_url}{path}"
    headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
    logger.debug(f"Calling API at {url}")
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
        return await _fetch(session, "GET", url, headers=headers)

You can similarly create a helper for your synchronous requests with requests to reduce duplication. This keeps functionality intact while reducing the repeated nesting and try/except blocks.


</details>

import asyncio

logger = logging.getLogger(__name__)

Expand All @@ -22,7 +23,7 @@ def __init__(self, emby_url: str, emby_api: str, timeout: int = 10):
f"EmbyApi initialized with URL: {self.base_url}, timeout: {self.timeout}"
)

def _request(self, method: str, path: str, data=None, params=None):
async def _request(self, method: str, path: str, data=None, params=None):
"""
内部通用请求方法,用于简化 GET / POST 等请求的异常处理、状态码检查等。

Expand All @@ -46,42 +47,28 @@ def _request(self, method: str, path: str, data=None, params=None):
logger.debug(
f"Making {method} request to {url} with params: {params}, data: {data}"
)
try:
if method.upper() == "GET":
response = requests.get(
url, params=params, timeout=self.timeout, headers=headers
)
elif method.upper() == "POST":
response = requests.post(
url, params=params, json=data, timeout=self.timeout, headers=headers
async with aiohttp.ClientSession(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): 考虑重用 ClientSession 以提高性能。

为每个请求创建一个新的 ClientSession 可能会导致不必要的开销和连接重建。根据使用模式,为类或应用程序的生命周期管理一个持久的 ClientSession 可能会有所帮助。

建议的实施方案:

    def __init__(self, timeout):
        self.timeout = timeout
        # Create a persistent aiohttp ClientSession for reuse
        self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout))
        # any other initialization code
            try:
                async with self._session.request(
                    method, url, params=params, json=data, headers=headers
                ) as response:
                    if response.status != 200:
                        raise Exception(
                            f"Request failed with status code {response.status}"
                        )
                    return await response.json()
            except aiohttp.ClientError as e:

由于 ClientSession 现在在初始化期间创建一次并且是持久的,因此您还应该添加一个方法,以便在您的类完成时关闭会话(例如,在关闭应用程序时)。
例如:
async def close(self):
await self._session.close()
您需要在应用程序生命周期的适当部分调用此 close() 方法。

Original comment in English

suggestion (performance): Consider reusing a ClientSession for performance.

Creating a new ClientSession for each request may lead to unnecessary overhead and connection re-establishment. Depending on the usage pattern, it might be beneficial to manage a persistent ClientSession for the lifetime of the class or application.

Suggested implementation:

    def __init__(self, timeout):
        self.timeout = timeout
        # Create a persistent aiohttp ClientSession for reuse
        self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout))
        # any other initialization code
            try:
                async with self._session.request(
                    method, url, params=params, json=data, headers=headers
                ) as response:
                    if response.status != 200:
                        raise Exception(
                            f"Request failed with status code {response.status}"
                        )
                    return await response.json()
            except aiohttp.ClientError as e:

Since the ClientSession is now created once during initialization and is persistent, you should also add a method to close the session when your class is done with it (for example, when shutting down the application).
For example:
async def close(self):
await self._session.close()
You'll need to call this close() method in the appropriate part of your application lifecycle.

timeout=aiohttp.ClientTimeout(total=self.timeout)
) as session:
try:
async with session.request(
method, url, params=params, json=data, headers=headers
) as response:
if response.status != 200:
raise Exception(
f"Request failed with status code {response.status}"
)
return await response.json()
except aiohttp.ClientError as e:
logger.error(
f"An error occurred while requesting Emby: {e}", exc_info=True
)
else:
raise Exception(f"暂不支持的 HTTP 方法: {method}")
raise Exception(f"请求 Emby 时发生错误: {str(e)}")
except asyncio.TimeoutError:
logger.error("Request to Emby server timed out", exc_info=True)
raise Exception("请求 Emby 服务器超时,请稍后重试或检查网络连接。")

except requests.exceptions.Timeout:
# 超时异常,抛出中文提示
logger.error("Request to Emby server timed out", exc_info=True)
raise Exception("请求 Emby 服务器超时,请稍后重试或检查网络连接。")
except requests.exceptions.ConnectionError as e:
# 连接异常
logger.error(f"Failed to connect to Emby server: {e}", exc_info=True)
raise Exception(f"无法连接到 Emby 服务器: {str(e)}")
except requests.exceptions.RequestException as e:
# 其他 requests 异常
logger.error(
f"An unknown error occurred while requesting Emby: {e}", exc_info=True
)
raise Exception(f"请求 Emby 时发生未知错误: {str(e)}")

try:
response.raise_for_status()
logger.debug(f"Request successful, status code: {response.status_code}")
except Exception as e:
logger.error(f"Emby API request failed: {e}", exc_info=True)
raise Exception(f"Emby API 请求失败")
return response.json() if response.text else None

def get_user(self, emby_id: str):
async def get_user(self, emby_id: str):
"""
根据用户 ID 获取 Emby 用户信息。
:param emby_id: Emby 用户 ID
Expand All @@ -90,14 +77,14 @@ def get_user(self, emby_id: str):
path = f"/emby/Users/{emby_id}"
logger.info(f"Getting user with Emby ID: {emby_id}")
try:
return self._request("GET", path)
return await self._request("GET", path)
except Exception as e:
logger.error(
f"Failed to get user with Emby ID {emby_id}: {e}", exc_info=True
)
raise

def create_user(self, name: str):
async def create_user(self, name: str):
"""
在 Emby 中创建新用户。
:param name: 用户名
Expand All @@ -107,12 +94,12 @@ def create_user(self, name: str):
data = {"Name": name, "HasPassword": False}
logger.info(f"Creating user with name: {name}")
try:
return self._request("POST", path, data=data)
return await self._request("POST", path, data=data)
except Exception as e:
logger.error(f"Failed to create user with name {name}: {e}", exc_info=True)
raise

def ban_user(self, emby_id: str):
async def ban_user(self, emby_id: str):
"""
禁用 Emby 用户:设置其 Policy,使其无法登录或观看。
:param emby_id: Emby 用户 ID
Expand Down Expand Up @@ -144,14 +131,14 @@ def ban_user(self, emby_id: str):
}
logger.info(f"Banning user with Emby ID: {emby_id}")
try:
return self.update_user_policy(emby_id, data)
return await self.update_user_policy(emby_id, data)
except Exception as e:
logger.error(
f"Failed to ban user with Emby ID {emby_id}: {e}", exc_info=True
)
raise

def set_default_policy(self, emby_id: str):
async def set_default_policy(self, emby_id: str):
"""
取消禁用或为新建用户设置默认权限 Policy。
:param emby_id: Emby 用户 ID
Expand Down Expand Up @@ -183,15 +170,15 @@ def set_default_policy(self, emby_id: str):
}
logger.info(f"Setting default policy for user with Emby ID: {emby_id}")
try:
return self.update_user_policy(emby_id, data)
return await self.update_user_policy(emby_id, data)
except Exception as e:
logger.error(
f"Failed to set default policy for user with Emby ID {emby_id}: {e}",
exc_info=True,
)
raise

def update_user_policy(self, emby_id: str, policy_data: dict):
async def update_user_policy(self, emby_id: str, policy_data: dict):
"""
更新 Emby 用户的 policy 设置,如是否禁用、并发数等。
:param emby_id: Emby 用户 ID
Expand All @@ -203,15 +190,15 @@ def update_user_policy(self, emby_id: str, policy_data: dict):
f"Updating user policy for Emby ID: {emby_id} with data: {policy_data}"
)
try:
return self._request("POST", path, data=policy_data)
return await self._request("POST", path, data=policy_data)
except Exception as e:
logger.error(
f"Failed to update user policy for Emby ID {emby_id}: {e}",
exc_info=True,
)
raise

def reset_user_password(self, emby_id: str):
async def reset_user_password(self, emby_id: str):
"""
重置用户密码(让 Emby 忘记当前密码,此后需要重新设置新密码)。
:param emby_id: Emby 用户 ID
Expand All @@ -221,15 +208,15 @@ def reset_user_password(self, emby_id: str):
data = {"ResetPassword": True}
logger.info(f"Resetting password for user with Emby ID: {emby_id}")
try:
return self._request("POST", path, data=data)
return await self._request("POST", path, data=data)
except Exception as e:
logger.error(
f"Failed to reset password for user with Emby ID {emby_id}: {e}",
exc_info=True,
)
raise

def set_user_password(self, emby_id: str, new_pass: str):
async def set_user_password(self, emby_id: str, new_pass: str):
"""
设置指定 Emby 用户的新密码。
:param emby_id: Emby 用户 ID
Expand All @@ -240,39 +227,37 @@ def set_user_password(self, emby_id: str, new_pass: str):
data = {"ResetPassword": False, "CurrentPw": "", "NewPw": new_pass}
logger.info(f"Setting password for user with Emby ID: {emby_id}")
try:
return self._request("POST", path, data=data)
return await self._request("POST", path, data=data)
except Exception as e:
logger.error(
f"Failed to set password for user with Emby ID {emby_id}: {e}",
exc_info=True,
)
raise

def check_emby_site(self) -> bool:
async def check_emby_site(self) -> bool:
"""
检查 Emby 是否可用,仅做简单的 200 检查。
:return: 若状态码为 200 则返回 True,否则抛出异常或返回 False
"""
path = "/emby/System/Info"
logger.info("Checking Emby site availability")
try:
self._request("GET", path)
await self._request("GET", path)
return True
except Exception as e:
logger.warning(f"Emby site check failed: {e}", exc_info=True)
# 这里可以选择记录日志,或者返回 False 让上层自己判断
# raise 或者 return False 看业务需求
return False

def count(self):
async def count(self):
"""
获取 Emby 中影视的数量汇总。
:return: 包含影视数量信息的 JSON
"""
path = "/emby/Items/Counts"
logger.info("Getting Emby item counts")
try:
return self._request("GET", path)
return await self._request("GET", path)
except Exception as e:
logger.error(f"Failed to get Emby item counts: {e}", exc_info=True)
raise
Expand All @@ -296,7 +281,7 @@ def __init__(self, api_url: str, api_key: str = "", timeout: int = 10):
f"EmbyRouterAPI initialized with URL: {self.api_url}, timeout: {self.timeout}"
)

def call_api(self, path: str):
async def call_api(self, path: str):
"""
路由API通用请求方法。
:param path: API路径
Expand All @@ -305,54 +290,57 @@ def call_api(self, path: str):
url = f"{self.api_url}{path}"
headers = {"Authorization": f"Bearer {self.api_key}"} if self.api_key else {}
logger.debug(f"Calling API at {url}")
try:
response = requests.get(url, headers=headers, timeout=self.timeout)
response.raise_for_status() # 如果状态码非 200-299,自动抛出异常
return response.json()
except requests.exceptions.Timeout:
logger.error("Request to router service timed out", exc_info=True)
raise Exception("请求路由服务超时,请稍后重试或检查网络连接。")
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to connect to router service: {e}", exc_info=True)
raise Exception(f"无法连接到路由服务: {str(e)}")
except requests.exceptions.RequestException as e:
logger.error(
f"An unknown error occurred while requesting router service: {e}",
exc_info=True,
)
raise Exception(f"请求路由服务时发生错误: {str(e)}")
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as session:
try:
async with session.get(url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Request failed with status code {response.status}"
)
return await response.json()
except aiohttp.ClientError as e:
logger.error(
f"An error occurred while requesting router service: {e}",
exc_info=True,
)
raise Exception(f"请求路由服务时发生错误: {str(e)}")
except asyncio.TimeoutError:
logger.error("Request to router service timed out", exc_info=True)
raise Exception("请求路由服务超时,请稍后重试或检查网络连接。")

def query_all_route(self):
async def query_all_route(self):
"""
获取所有可用线路。
"""
logger.info("Querying all routes")
try:
return self.call_api("/api/route")
return await self.call_api("/api/route")
except Exception as e:
logger.error(f"Failed to query all routes: {e}", exc_info=True)
raise

def query_user_route(self, user_id: str):
async def query_user_route(self, user_id: str):
"""
获取指定用户当前所选的线路信息。
"""
logger.info(f"Querying user route for user ID: {user_id}")
try:
return self.call_api(f"/api/route/{user_id}")
return await self.call_api(f"/api/route/{user_id}")
except Exception as e:
logger.error(
f"Failed to query user route for user ID {user_id}: {e}", exc_info=True
)
raise

def update_user_route(self, user_id: str, new_index: str):
async def update_user_route(self, user_id: str, new_index: str):
"""
更新用户当前所使用的线路。
"""
logger.info(f"Updating user route for user ID: {user_id} to index: {new_index}")
try:
return self.call_api(f"/api/route/{user_id}/{new_index}")
return await self.call_api(f"/api/route/{user_id}/{new_index}")
except Exception as e:
logger.error(
f"Failed to update user route for user ID {user_id} to index {new_index}: {e}",
Expand Down
7 changes: 4 additions & 3 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
- 限时或限量开放注册。

### 安装及运行
本项目用[uv](https://github.com/astral-sh/uv)管理及运行
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (typo): “用”字后缺少冒号

“用”字后似乎应该有一个冒号,以提高可读性。它应该读作“本项目用:uv管理及运行”

Suggested change
本项目用[uv](https://github.com/astral-sh/uv)管理及运行
本项目用[uv](https://github.com/astral-sh/uv)管理及运行
Original comment in English

suggestion (typo): Missing colon after "用"

It seems like there should be a colon after the word "用" to improve readability. It should read as "本项目用:uv管理及运行"

Suggested change
本项目用[uv](https://github.com/astral-sh/uv)管理及运行
本项目用[uv](https://github.com/astral-sh/uv)管理及运行

```bash
git clone https://github.com/embyplus/embyBot
cp .env.example .env
vim .env

python3 -m pip install -r requirements.txt
python3 app.py
uv sync
uv run app.py
```

### 配置环境变量
Expand Down Expand Up @@ -77,4 +78,4 @@ python3 app.py
### Thanks
- [Pyrogram](https://docs.pyrogram.org/) - Telegram API for Python
- [SQLAlchemy](https://www.sqlalchemy.org/) - Python SQL Toolkit and Object-Relational Mapping
- [Emby服管理bot by小草](https://github.com/xiaocao666tzh/EmbyBot)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (typo): 缺少空格和损坏的链接

“Emby”和“服管理bot”之间应该有一个空格。此外,“No newline at end of file”之前的反斜杠破坏了链接。

Original comment in English

issue (typo): Missing space and broken link

There should be a space between "Emby" and "服管理bot". Also, the backslash before "No newline at end of file" is breaking the link.

- [Emby服管理bot by小草](https://github.com/xiaocao666tzh/EmbyBot)
Loading