From dd7135b6bba84954fe2d9720260fdeb32f13b997 Mon Sep 17 00:00:00 2001 From: GEYUANwuqi <17539198883@163.com> Date: Sun, 22 Mar 2026 01:03:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(bilibili):=20=E6=96=B0=E5=A2=9E=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E9=A1=B5=E5=A4=9Auid=E8=BD=AE=E8=AF=A2=20-=20?= =?UTF-8?q?=E7=9B=B8=E5=85=B3test=E6=96=87=E4=BB=B6=E5=B7=B2=E9=9A=8F?= =?UTF-8?q?=E6=9C=AC=E6=AC=A1=E6=8F=90=E4=BA=A4=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ncatbot/adapter/bilibili/adapter.py | 7 +- ncatbot/adapter/bilibili/api/dynamic.py | 25 ++- ncatbot/adapter/bilibili/config.py | 2 + ncatbot/adapter/bilibili/source/__init__.py | 2 + .../bilibili/source/dynamic_page_source.py | 177 +++++++++++++++ ncatbot/adapter/bilibili/source/manager.py | 37 ++++ ncatbot/api/bilibili/interface.py | 14 ++ .../test_bilibili_dynamic_page_source.py | 209 ++++++++++++++++++ tests/unit/adapter/test_bilibili_parser.py | 1 + 9 files changed, 472 insertions(+), 2 deletions(-) create mode 100644 ncatbot/adapter/bilibili/source/dynamic_page_source.py create mode 100644 tests/unit/adapter/test_bilibili_dynamic_page_source.py diff --git a/ncatbot/adapter/bilibili/adapter.py b/ncatbot/adapter/bilibili/adapter.py index ca0e6aa5..0e3c095f 100644 --- a/ncatbot/adapter/bilibili/adapter.py +++ b/ncatbot/adapter/bilibili/adapter.py @@ -96,6 +96,7 @@ async def connect(self) -> None: session_poll_interval=self._config.session_poll_interval, comment_poll_interval=self._config.comment_poll_interval, dynamic_poll_interval=self._config.dynamic_poll_interval, + dynamic_page_poll_interval=self._config.dynamic_page_poll_interval, ) self._api = BiliBotAPI(self._credential, self._source_manager) @@ -115,13 +116,17 @@ async def connect(self) -> None: for watch in self._config.dynamic_watches: await self._source_manager.add_dynamic_watch(watch.uid, self._credential) + for watch in self._config.dynamic_page_watches: + await self._api.add_dynamic_page_watch(watch.uid) + self._connected = True LOG.info( - "Bilibili 适配器已连接 (直播间: %d, 私信: %s, 评论: %d, 动态: %d)", + "Bilibili 适配器已连接 (直播间: %d, 私信: %s, 评论: %d, 动态: %d, 动态页: %d)", len(self._config.live_rooms), self._config.enable_session, len(self._config.comment_watches), len(self._config.dynamic_watches), + len(self._config.dynamic_page_watches), ) async def listen(self) -> None: diff --git a/ncatbot/adapter/bilibili/api/dynamic.py b/ncatbot/adapter/bilibili/api/dynamic.py index 29a69d55..9f19df18 100644 --- a/ncatbot/adapter/bilibili/api/dynamic.py +++ b/ncatbot/adapter/bilibili/api/dynamic.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import Optional +from typing import Any, Optional class DynamicAPIMixin: @@ -33,3 +33,26 @@ async def add_dynamic_watch(self, uid: int) -> None: async def remove_dynamic_watch(self, uid: int) -> None: """移除动态监听""" await self._source_manager.remove_dynamic_watch(uid) + + async def add_dynamic_page_watch(self, uid: int) -> None: + """添加动态页监听(先关注再加入轮询)""" + await self.follow_user(uid) + await self._source_manager.add_dynamic_page_watch(uid, self._credential) + + async def remove_dynamic_page_watch(self, uid: int) -> None: + """移除动态页监听""" + await self._source_manager.remove_dynamic_page_watch(uid) + + async def follow_user(self, uid: int) -> Any: + """关注用户(已关注或关注自己时静默跳过)""" + from bilibili_api.user import User, RelationType + from bilibili_api.exceptions import ResponseCodeException + + user = User(uid=uid, credential=self._credential) + try: + return await user.modify_relation(RelationType.SUBSCRIBE) + except ResponseCodeException as e: + # 22001: 不能关注自己 22014: 已经关注 + if e.code in (22001, 22014): + return None + raise diff --git a/ncatbot/adapter/bilibili/config.py b/ncatbot/adapter/bilibili/config.py index 63306bf8..19165ec5 100644 --- a/ncatbot/adapter/bilibili/config.py +++ b/ncatbot/adapter/bilibili/config.py @@ -35,11 +35,13 @@ class BilibiliConfig(BaseModel): enable_session: bool = False comment_watches: List[CommentWatch] = Field(default_factory=list) dynamic_watches: List[DynamicWatch] = Field(default_factory=list) + dynamic_page_watches: List[DynamicWatch] = Field(default_factory=list) # 轮询配置 session_poll_interval: float = 6.0 comment_poll_interval: float = 30.0 dynamic_poll_interval: float = 600.0 + dynamic_page_poll_interval: float = 180.0 # 连接 max_retry: int = 5 diff --git a/ncatbot/adapter/bilibili/source/__init__.py b/ncatbot/adapter/bilibili/source/__init__.py index d746e7f3..8d2b9553 100644 --- a/ncatbot/adapter/bilibili/source/__init__.py +++ b/ncatbot/adapter/bilibili/source/__init__.py @@ -3,6 +3,7 @@ from .session_source import SessionSource from .comment_source import CommentSource from .dynamic_source import DynamicSource +from .dynamic_page_source import DynamicPageSource from .manager import SourceManager __all__ = [ @@ -11,5 +12,6 @@ "SessionSource", "CommentSource", "DynamicSource", + "DynamicPageSource", "SourceManager", ] diff --git a/ncatbot/adapter/bilibili/source/dynamic_page_source.py b/ncatbot/adapter/bilibili/source/dynamic_page_source.py new file mode 100644 index 00000000..6f6cfb73 --- /dev/null +++ b/ncatbot/adapter/bilibili/source/dynamic_page_source.py @@ -0,0 +1,177 @@ +"""动态页数据源 — 轮询动态主页,一次性监听多个订阅 UP 主的新动态 + +通过 bilibili_api.dynamic.get_dynamic_page_info 拉取动态主页, +按订阅 UID 集合过滤,基于每个 UP 主的最新时间戳增量检测新动态。 +""" + +from __future__ import annotations + +import asyncio +from typing import Any, Awaitable, Callable, Dict, Optional, Set + +from ncatbot.utils import get_log + +from .base import BaseSource + +LOG = get_log("DynamicPageSource") + +# 默认轮询间隔 3 分钟 +_DEFAULT_POLL_INTERVAL = 180.0 + + +class DynamicPageSource(BaseSource): + source_type = "dynamic" + + def __init__( + self, + credential: Any, + callback: Callable[[str, dict], Awaitable[None]], + *, + poll_interval: float = _DEFAULT_POLL_INTERVAL, + ) -> None: + super().__init__(callback) + self.source_id = "dynamic_page" + self._credential = credential + self._poll_interval = poll_interval + # 订阅的 UP 主 UID 集合 + self._watched_uids: Set[int] = set() + # 每个 UP 主的最新已知动态时间戳 + self._last_ts: Dict[int, int] = {} + self._task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + + def add_uid(self, uid: int) -> None: + """添加一个订阅 UID""" + self._watched_uids.add(uid) + + def remove_uid(self, uid: int) -> None: + """移除一个订阅 UID""" + self._watched_uids.discard(uid) + self._last_ts.pop(uid, None) + + def has_uid(self, uid: int) -> bool: + return uid in self._watched_uids + + @property + def empty(self) -> bool: + return len(self._watched_uids) == 0 + + async def start(self) -> None: + if self._running: + return + self._stop_event.clear() + self._running = True + self._task = asyncio.create_task(self._poll_loop(), name="dynamic_page_source") + LOG.info( + "动态页源已启动 (轮询间隔 %.0fs, 订阅 %d 个 UID)", + self._poll_interval, + len(self._watched_uids), + ) + + # ---- 静态工具 ---- + + @staticmethod + def _extract_uid(item: dict) -> int: + """从动态 item 中提取作者 UID""" + modules = item.get("modules") or {} + author = modules.get("module_author") or {} + return int(author.get("mid", 0)) + + @staticmethod + def _extract_pub_ts(item: dict) -> int: + """提取动态发布时间戳""" + modules = item.get("modules") or {} + author = modules.get("module_author") or {} + return int(author.get("pub_ts", 0)) + + # ---- 轮询 ---- + + async def _fetch_page(self) -> list: + """拉取动态主页第一页""" + from bilibili_api.dynamic import get_dynamic_page_info + + resp = await get_dynamic_page_info(credential=self._credential) + return resp.get("items") or [] + + async def _poll_loop(self) -> None: + # 初次拉取,建立时间戳基线 + try: + items = await self._fetch_page() + self._init_baselines(items) + except Exception: + LOG.warning("动态页源初次拉取失败", exc_info=True) + + while not self._stop_event.is_set(): + try: + await asyncio.wait_for( + self._stop_event.wait(), timeout=self._poll_interval + ) + break + except asyncio.TimeoutError: + pass + + try: + items = await self._fetch_page() + await self._detect_new_dynamics(items) + except asyncio.CancelledError: + break + except Exception: + LOG.debug("动态页源轮询异常", exc_info=True) + + self._running = False + + def _init_baselines(self, items: list) -> None: + """用首次拉取的结果初始化每个订阅 UID 的时间戳基线""" + for item in items: + uid = self._extract_uid(item) + if uid not in self._watched_uids: + continue + ts = self._extract_pub_ts(item) + # 只记录更大的时间戳(同一 UID 可能有多条) + if ts > self._last_ts.get(uid, 0): + self._last_ts[uid] = ts + LOG.debug("动态页源基线: %s", self._last_ts) + + async def _detect_new_dynamics(self, items: list) -> None: + """检测并推送新动态""" + # 按 UID 分组,收集每个订阅 UP 主的动态 + uid_items: Dict[int, list] = {} + for item in items: + uid = self._extract_uid(item) + if uid not in self._watched_uids: + continue + uid_items.setdefault(uid, []).append(item) + + for uid, dyn_items in uid_items.items(): + old_ts = self._last_ts.get(uid, 0) + # 过滤出新动态(时间戳严格大于已知最大值) + new_items = [it for it in dyn_items if self._extract_pub_ts(it) > old_ts] + if not new_items: + continue + + # 按时间戳升序推送,确保先发的先推 + new_items.sort(key=self._extract_pub_ts) + for item in new_items: + ts = self._extract_pub_ts(item) + raw = { + "source": "dynamic", + "uid": uid, + "status": "new", + "dynamic": item, + } + await self._callback("dynamic", raw) + # 更新时间戳 + if ts > self._last_ts.get(uid, 0): + self._last_ts[uid] = ts + + async def stop(self) -> None: + self._stop_event.set() + if self._task is not None and not self._task.done(): + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): + pass + self._running = False + self._task = None + LOG.debug("动态页源已停止") diff --git a/ncatbot/adapter/bilibili/source/manager.py b/ncatbot/adapter/bilibili/source/manager.py index 2398112c..5777308e 100644 --- a/ncatbot/adapter/bilibili/source/manager.py +++ b/ncatbot/adapter/bilibili/source/manager.py @@ -16,6 +16,7 @@ from .session_source import SessionSource from .comment_source import CommentSource from .dynamic_source import DynamicSource +from .dynamic_page_source import DynamicPageSource LOG = get_log("SourceManager") @@ -32,6 +33,7 @@ def __init__( session_poll_interval: float = 6.0, comment_poll_interval: float = 30.0, dynamic_poll_interval: float = 600.0, + dynamic_page_poll_interval: float = 180.0, ) -> None: self._callback = callback self._sources: Dict[str, BaseSource] = {} @@ -40,6 +42,7 @@ def __init__( self._session_poll_interval = session_poll_interval self._comment_poll_interval = comment_poll_interval self._dynamic_poll_interval = dynamic_poll_interval + self._dynamic_page_poll_interval = dynamic_page_poll_interval self._stop_event = asyncio.Event() # ---- 直播间 ---- @@ -107,6 +110,40 @@ async def remove_dynamic_watch(self, uid: int) -> None: if source is not None: await source.stop() + # ---- 动态页(多 UP 主合并轮询) ---- + + def _get_dynamic_page_source(self, credential: Any) -> DynamicPageSource: + """获取或创建全局唯一的 DynamicPageSource""" + key = "dynamic_page" + if key not in self._sources: + source = DynamicPageSource( + credential=credential, + callback=self._callback, + poll_interval=self._dynamic_page_poll_interval, + ) + self._sources[key] = source + return self._sources[key] # type: ignore[return-value] + + async def add_dynamic_page_watch(self, uid: int, credential: Any) -> None: + source = self._get_dynamic_page_source(credential) + if source.has_uid(uid): + LOG.warning("动态页源 UID %s 已存在,跳过", uid) + return + source.add_uid(uid) + if not source.running: + await source.start() + LOG.info("动态页源新增监听 UID: %s", uid) + + async def remove_dynamic_page_watch(self, uid: int) -> None: + key = "dynamic_page" + source = self._sources.get(key) + if source is None or not isinstance(source, DynamicPageSource): + return + source.remove_uid(uid) + if source.empty: + await source.stop() + self._sources.pop(key, None) + # ---- 评论 ---- async def add_comment_watch( diff --git a/ncatbot/api/bilibili/interface.py b/ncatbot/api/bilibili/interface.py index 8069d1bd..2c5bc575 100644 --- a/ncatbot/api/bilibili/interface.py +++ b/ncatbot/api/bilibili/interface.py @@ -131,6 +131,20 @@ async def add_dynamic_watch(self, uid: int) -> None: async def remove_dynamic_watch(self, uid: int) -> None: """移除动态监听""" + @abstractmethod + async def add_dynamic_page_watch(self, uid: int) -> None: + """添加动态页监听(通过动态主页轮询,可同时监听多个 UP 主)""" + + @abstractmethod + async def remove_dynamic_page_watch(self, uid: int) -> None: + """移除动态页监听""" + + # ---- 用户关系 ---- + + @abstractmethod + async def follow_user(self, uid: int) -> Any: + """关注用户""" + # ---- 用户查询 ---- @abstractmethod diff --git a/tests/unit/adapter/test_bilibili_dynamic_page_source.py b/tests/unit/adapter/test_bilibili_dynamic_page_source.py new file mode 100644 index 00000000..dc7a12af --- /dev/null +++ b/tests/unit/adapter/test_bilibili_dynamic_page_source.py @@ -0,0 +1,209 @@ +"""BL-23: DynamicPageSource 多 UID 管理与增量检测测试 + +DynamicPageSource 的 UID 管理、基线初始化、新动态检测逻辑, +不涉及任何网络请求(纯离线单元测试)。 + +规范: + BL-23a: UID 增删管理 — add_uid / has_uid / remove_uid / empty 属性 + BL-23b: 基线初始化 — _init_baselines 仅记录已订阅 UID 的最大时间戳 + BL-23c: 新动态检测 — _detect_new_dynamics 按 ts > last_ts 过滤,升序推送 +""" + +from ncatbot.adapter.bilibili.source.dynamic_page_source import DynamicPageSource + + +# ─── 辅助工具 ─────────────────────────────────────────────────────────────── + + +def _make_item(uid: int, ts: int) -> dict: + """构造最小动态 item(仅含解析所需字段)""" + return { + "id_str": f"{uid}_{ts}", + "type": "DYNAMIC_TYPE_WORD", + "modules": { + "module_author": { + "mid": uid, + "name": f"user_{uid}", + "pub_ts": ts, + } + }, + } + + +class _Collector: + """收集回调调用记录""" + + def __init__(self): + self.calls: list[tuple] = [] + + async def __call__(self, source_type: str, raw_data: dict) -> None: + self.calls.append((source_type, raw_data)) + + +def _make_source( + watched_uids: list[int] | None = None, +) -> tuple[DynamicPageSource, _Collector]: + """创建 DynamicPageSource 实例,不启动轮询 task""" + coll = _Collector() + src = DynamicPageSource(credential=None, callback=coll, poll_interval=999.0) + for uid in watched_uids or []: + src.add_uid(uid) + return src, coll + + +# ─── BL-23a: UID 增删管理 ─────────────────────────────────────────────────── + + +class TestBL23aUidManagement: + """BL-23a: UID 增删管理""" + + def test_bl23a_add_has_uid(self): + src, _ = _make_source() + src.add_uid(111) + assert src.has_uid(111) + + def test_bl23a_remove_uid(self): + src, _ = _make_source([111, 222]) + src.remove_uid(111) + assert not src.has_uid(111) + assert src.has_uid(222) + + def test_bl23a_empty_when_no_uids(self): + src, _ = _make_source() + assert src.empty + + def test_bl23a_not_empty_after_add(self): + src, _ = _make_source([333]) + assert not src.empty + + def test_bl23a_empty_after_remove_all(self): + src, _ = _make_source([444]) + src.remove_uid(444) + assert src.empty + + def test_bl23a_remove_clears_last_ts(self): + src, _ = _make_source([555]) + src._last_ts[555] = 100 + src.remove_uid(555) + assert 555 not in src._last_ts + + def test_bl23a_duplicate_add_is_idempotent(self): + src, _ = _make_source([100]) + src.add_uid(100) + src.add_uid(100) + assert src.has_uid(100) + assert len(src._watched_uids) == 1 + + +# ─── BL-23b: 基线初始化 ───────────────────────────────────────────────────── + + +class TestBL23bInitBaselines: + """BL-23b: 基线初始化""" + + def test_bl23b_sets_last_ts_for_watched_uid(self): + src, _ = _make_source([100]) + # 两条动态,取最大 ts + items = [_make_item(100, 900), _make_item(100, 1000)] + src._init_baselines(items) + assert src._last_ts[100] == 1000 + + def test_bl23b_ignores_unwatched_uid(self): + src, _ = _make_source([100]) + items = [_make_item(999, 5000)] # uid 999 未订阅 + src._init_baselines(items) + assert 999 not in src._last_ts + + def test_bl23b_multiple_uids_independent(self): + src, _ = _make_source([100, 200]) + items = [_make_item(100, 500), _make_item(200, 300)] + src._init_baselines(items) + assert src._last_ts[100] == 500 + assert src._last_ts[200] == 300 + + def test_bl23b_empty_items_leaves_no_baseline(self): + src, _ = _make_source([100]) + src._init_baselines([]) + assert 100 not in src._last_ts + + +# ─── BL-23c: 新动态检测 ───────────────────────────────────────────────────── + + +class TestBL23cNewDynamicDetection: + """BL-23c: 新动态检测与推送""" + + async def test_bl23c_new_dynamic_triggers_callback(self): + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + await src._detect_new_dynamics([_make_item(100, 1001)]) + assert len(coll.calls) == 1 + source_type, raw = coll.calls[0] + assert source_type == "dynamic" + assert raw["uid"] == 100 + assert raw["status"] == "new" + + async def test_bl23c_equal_ts_no_callback(self): + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + await src._detect_new_dynamics([_make_item(100, 1000)]) + assert len(coll.calls) == 0 + + async def test_bl23c_old_ts_no_callback(self): + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + await src._detect_new_dynamics([_make_item(100, 999)]) + assert len(coll.calls) == 0 + + async def test_bl23c_updates_last_ts_after_push(self): + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + await src._detect_new_dynamics([_make_item(100, 1005)]) + assert src._last_ts[100] == 1005 + + async def test_bl23c_multiple_new_items_sorted_ascending(self): + """多条新动态按 pub_ts 升序推送""" + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + items = [_make_item(100, 1030), _make_item(100, 1010), _make_item(100, 1020)] + await src._detect_new_dynamics(items) + assert len(coll.calls) == 3 + pushed_ts = [ + c[1]["dynamic"]["modules"]["module_author"]["pub_ts"] for c in coll.calls + ] + assert pushed_ts == [1010, 1020, 1030] + + async def test_bl23c_unwatched_uid_not_pushed(self): + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + items = [_make_item(999, 9999)] # uid 999 未订阅 + await src._detect_new_dynamics(items) + assert len(coll.calls) == 0 + + async def test_bl23c_no_baseline_zero_treated_as_floor(self): + """未初始化基线时 last_ts 默认 0,任何 ts > 0 均为新动态""" + src, coll = _make_source([100]) + # 不调用 _init_baselines —— _last_ts.get(100, 0) == 0 + await src._detect_new_dynamics([_make_item(100, 1)]) + assert len(coll.calls) == 1 + + async def test_bl23c_multi_uid_isolation(self): + """不同 UID 的检测互不干扰""" + src, coll = _make_source([100, 200]) + src._last_ts[100] = 2000 + src._last_ts[200] = 500 + items = [ + _make_item(100, 1999), # old for uid 100 + _make_item(200, 600), # new for uid 200 + ] + await src._detect_new_dynamics(items) + assert len(coll.calls) == 1 + assert coll.calls[0][1]["uid"] == 200 + + async def test_bl23c_last_ts_is_max_of_pushed(self): + """推送多条后 last_ts 更新为最大 ts""" + src, coll = _make_source([100]) + src._last_ts[100] = 1000 + items = [_make_item(100, 1010), _make_item(100, 1050), _make_item(100, 1030)] + await src._detect_new_dynamics(items) + assert src._last_ts[100] == 1050 diff --git a/tests/unit/adapter/test_bilibili_parser.py b/tests/unit/adapter/test_bilibili_parser.py index 21630f73..74796781 100644 --- a/tests/unit/adapter/test_bilibili_parser.py +++ b/tests/unit/adapter/test_bilibili_parser.py @@ -26,6 +26,7 @@ BL-20: 删除动态解析 — dynamic_event_type 为 DELETED_DYNAMIC BL-21: 转发动态 (DYNAMIC_TYPE_FORWARD) 解析 BL-22: DataPair 时间戳缓存与深拷贝 + BL-23 系列(DynamicPageSource)见 test_bilibili_dynamic_page_source.py """ import json