Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ncatbot/adapter/bilibili/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ async def connect(self) -> None:
session_poll_interval=self._config.session_poll_interval,
comment_poll_interval=self._config.comment_poll_interval,
dynamic_poll_interval=self._config.dynamic_poll_interval,
dynamic_page_poll_interval=self._config.dynamic_page_poll_interval,
)

self._api = BiliBotAPI(self._credential, self._source_manager)
Expand All @@ -115,13 +116,17 @@ async def connect(self) -> None:
for watch in self._config.dynamic_watches:
await self._source_manager.add_dynamic_watch(watch.uid, self._credential)

for watch in self._config.dynamic_page_watches:
await self._api.add_dynamic_page_watch(watch.uid)

self._connected = True
LOG.info(
"Bilibili 适配器已连接 (直播间: %d, 私信: %s, 评论: %d, 动态: %d)",
"Bilibili 适配器已连接 (直播间: %d, 私信: %s, 评论: %d, 动态: %d, 动态页: %d)",
len(self._config.live_rooms),
self._config.enable_session,
len(self._config.comment_watches),
len(self._config.dynamic_watches),
len(self._config.dynamic_page_watches),
)

async def listen(self) -> None:
Expand Down
25 changes: 24 additions & 1 deletion ncatbot/adapter/bilibili/api/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from typing import Optional
from typing import Any, Optional


class DynamicAPIMixin:
Expand Down Expand Up @@ -33,3 +33,26 @@ async def add_dynamic_watch(self, uid: int) -> None:
async def remove_dynamic_watch(self, uid: int) -> None:
"""移除动态监听"""
await self._source_manager.remove_dynamic_watch(uid)

async def add_dynamic_page_watch(self, uid: int) -> None:
"""添加动态页监听(先关注再加入轮询)"""
await self.follow_user(uid)
await self._source_manager.add_dynamic_page_watch(uid, self._credential)

async def remove_dynamic_page_watch(self, uid: int) -> None:
"""移除动态页监听"""
await self._source_manager.remove_dynamic_page_watch(uid)

async def follow_user(self, uid: int) -> Any:
"""关注用户(已关注或关注自己时静默跳过)"""
from bilibili_api.user import User, RelationType
from bilibili_api.exceptions import ResponseCodeException

user = User(uid=uid, credential=self._credential)
try:
return await user.modify_relation(RelationType.SUBSCRIBE)
except ResponseCodeException as e:
# 22001: 不能关注自己 22014: 已经关注
if e.code in (22001, 22014):
return None
raise
2 changes: 2 additions & 0 deletions ncatbot/adapter/bilibili/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class BilibiliConfig(BaseModel):
enable_session: bool = False
comment_watches: List[CommentWatch] = Field(default_factory=list)
dynamic_watches: List[DynamicWatch] = Field(default_factory=list)
dynamic_page_watches: List[DynamicWatch] = Field(default_factory=list)

# 轮询配置
session_poll_interval: float = 6.0
comment_poll_interval: float = 30.0
dynamic_poll_interval: float = 600.0
dynamic_page_poll_interval: float = 180.0

# 连接
max_retry: int = 5
Expand Down
2 changes: 2 additions & 0 deletions ncatbot/adapter/bilibili/source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .session_source import SessionSource
from .comment_source import CommentSource
from .dynamic_source import DynamicSource
from .dynamic_page_source import DynamicPageSource
from .manager import SourceManager

__all__ = [
Expand All @@ -11,5 +12,6 @@
"SessionSource",
"CommentSource",
"DynamicSource",
"DynamicPageSource",
"SourceManager",
]
177 changes: 177 additions & 0 deletions ncatbot/adapter/bilibili/source/dynamic_page_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""动态页数据源 — 轮询动态主页,一次性监听多个订阅 UP 主的新动态

通过 bilibili_api.dynamic.get_dynamic_page_info 拉取动态主页,
按订阅 UID 集合过滤,基于每个 UP 主的最新时间戳增量检测新动态。
"""

from __future__ import annotations

import asyncio
from typing import Any, Awaitable, Callable, Dict, Optional, Set

from ncatbot.utils import get_log

from .base import BaseSource

LOG = get_log("DynamicPageSource")

# 默认轮询间隔 3 分钟
_DEFAULT_POLL_INTERVAL = 180.0


class DynamicPageSource(BaseSource):
source_type = "dynamic"

def __init__(
self,
credential: Any,
callback: Callable[[str, dict], Awaitable[None]],
*,
poll_interval: float = _DEFAULT_POLL_INTERVAL,
) -> None:
super().__init__(callback)
self.source_id = "dynamic_page"
self._credential = credential
self._poll_interval = poll_interval
# 订阅的 UP 主 UID 集合
self._watched_uids: Set[int] = set()
# 每个 UP 主的最新已知动态时间戳
self._last_ts: Dict[int, int] = {}
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()

def add_uid(self, uid: int) -> None:
"""添加一个订阅 UID"""
self._watched_uids.add(uid)

def remove_uid(self, uid: int) -> None:
"""移除一个订阅 UID"""
self._watched_uids.discard(uid)
self._last_ts.pop(uid, None)

def has_uid(self, uid: int) -> bool:
return uid in self._watched_uids

@property
def empty(self) -> bool:
return len(self._watched_uids) == 0

async def start(self) -> None:
if self._running:
return
self._stop_event.clear()
self._running = True
self._task = asyncio.create_task(self._poll_loop(), name="dynamic_page_source")
LOG.info(
"动态页源已启动 (轮询间隔 %.0fs, 订阅 %d 个 UID)",
self._poll_interval,
len(self._watched_uids),
)

# ---- 静态工具 ----

@staticmethod
def _extract_uid(item: dict) -> int:
"""从动态 item 中提取作者 UID"""
modules = item.get("modules") or {}
author = modules.get("module_author") or {}
return int(author.get("mid", 0))

@staticmethod
def _extract_pub_ts(item: dict) -> int:
"""提取动态发布时间戳"""
modules = item.get("modules") or {}
author = modules.get("module_author") or {}
return int(author.get("pub_ts", 0))

# ---- 轮询 ----

async def _fetch_page(self) -> list:
"""拉取动态主页第一页"""
from bilibili_api.dynamic import get_dynamic_page_info

resp = await get_dynamic_page_info(credential=self._credential)
return resp.get("items") or []

async def _poll_loop(self) -> None:
# 初次拉取,建立时间戳基线
try:
items = await self._fetch_page()
self._init_baselines(items)
except Exception:
LOG.warning("动态页源初次拉取失败", exc_info=True)

while not self._stop_event.is_set():
try:
await asyncio.wait_for(
self._stop_event.wait(), timeout=self._poll_interval
)
break
except asyncio.TimeoutError:
pass

try:
items = await self._fetch_page()
await self._detect_new_dynamics(items)
except asyncio.CancelledError:
break
except Exception:
LOG.debug("动态页源轮询异常", exc_info=True)

self._running = False

def _init_baselines(self, items: list) -> None:
"""用首次拉取的结果初始化每个订阅 UID 的时间戳基线"""
for item in items:
uid = self._extract_uid(item)
if uid not in self._watched_uids:
continue
ts = self._extract_pub_ts(item)
# 只记录更大的时间戳(同一 UID 可能有多条)
if ts > self._last_ts.get(uid, 0):
self._last_ts[uid] = ts
LOG.debug("动态页源基线: %s", self._last_ts)

async def _detect_new_dynamics(self, items: list) -> None:
"""检测并推送新动态"""
# 按 UID 分组,收集每个订阅 UP 主的动态
uid_items: Dict[int, list] = {}
for item in items:
uid = self._extract_uid(item)
if uid not in self._watched_uids:
continue
uid_items.setdefault(uid, []).append(item)

for uid, dyn_items in uid_items.items():
old_ts = self._last_ts.get(uid, 0)
# 过滤出新动态(时间戳严格大于已知最大值)
new_items = [it for it in dyn_items if self._extract_pub_ts(it) > old_ts]
if not new_items:
continue

# 按时间戳升序推送,确保先发的先推
new_items.sort(key=self._extract_pub_ts)
for item in new_items:
ts = self._extract_pub_ts(item)
raw = {
"source": "dynamic",
"uid": uid,
"status": "new",
"dynamic": item,
}
await self._callback("dynamic", raw)
# 更新时间戳
if ts > self._last_ts.get(uid, 0):
self._last_ts[uid] = ts

async def stop(self) -> None:
self._stop_event.set()
if self._task is not None and not self._task.done():
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception):
pass
self._running = False
self._task = None
LOG.debug("动态页源已停止")
37 changes: 37 additions & 0 deletions ncatbot/adapter/bilibili/source/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .session_source import SessionSource
from .comment_source import CommentSource
from .dynamic_source import DynamicSource
from .dynamic_page_source import DynamicPageSource

LOG = get_log("SourceManager")

Expand All @@ -32,6 +33,7 @@ def __init__(
session_poll_interval: float = 6.0,
comment_poll_interval: float = 30.0,
dynamic_poll_interval: float = 600.0,
dynamic_page_poll_interval: float = 180.0,
) -> None:
self._callback = callback
self._sources: Dict[str, BaseSource] = {}
Expand All @@ -40,6 +42,7 @@ def __init__(
self._session_poll_interval = session_poll_interval
self._comment_poll_interval = comment_poll_interval
self._dynamic_poll_interval = dynamic_poll_interval
self._dynamic_page_poll_interval = dynamic_page_poll_interval
self._stop_event = asyncio.Event()

# ---- 直播间 ----
Expand Down Expand Up @@ -107,6 +110,40 @@ async def remove_dynamic_watch(self, uid: int) -> None:
if source is not None:
await source.stop()

# ---- 动态页(多 UP 主合并轮询) ----

def _get_dynamic_page_source(self, credential: Any) -> DynamicPageSource:
"""获取或创建全局唯一的 DynamicPageSource"""
key = "dynamic_page"
if key not in self._sources:
source = DynamicPageSource(
credential=credential,
callback=self._callback,
poll_interval=self._dynamic_page_poll_interval,
)
self._sources[key] = source
return self._sources[key] # type: ignore[return-value]

async def add_dynamic_page_watch(self, uid: int, credential: Any) -> None:
source = self._get_dynamic_page_source(credential)
if source.has_uid(uid):
LOG.warning("动态页源 UID %s 已存在,跳过", uid)
return
source.add_uid(uid)
if not source.running:
await source.start()
LOG.info("动态页源新增监听 UID: %s", uid)

async def remove_dynamic_page_watch(self, uid: int) -> None:
key = "dynamic_page"
source = self._sources.get(key)
if source is None or not isinstance(source, DynamicPageSource):
return
source.remove_uid(uid)
if source.empty:
await source.stop()
self._sources.pop(key, None)

# ---- 评论 ----

async def add_comment_watch(
Expand Down
14 changes: 14 additions & 0 deletions ncatbot/api/bilibili/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ async def add_dynamic_watch(self, uid: int) -> None:
async def remove_dynamic_watch(self, uid: int) -> None:
"""移除动态监听"""

@abstractmethod
async def add_dynamic_page_watch(self, uid: int) -> None:
"""添加动态页监听(通过动态主页轮询,可同时监听多个 UP 主)"""

@abstractmethod
async def remove_dynamic_page_watch(self, uid: int) -> None:
"""移除动态页监听"""

# ---- 用户关系 ----

@abstractmethod
async def follow_user(self, uid: int) -> Any:
"""关注用户"""

# ---- 用户查询 ----

@abstractmethod
Expand Down
Loading
Loading