Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ncatbot/adapter/bilibili/api/danmu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

class DanmuAPIMixin:
async def send_danmu(self, room_id: int, text: str) -> Any:
from bilibili_api.utils.danmaku import Danmaku

room = self._get_room(room_id)
return await room.send_danmaku(text)
return await room.send_danmaku(Danmaku(text=text))

def _get_room(self, room_id: int) -> Any:
"""获取或创建 LiveRoom 实例(带缓存)"""
Expand Down
9 changes: 6 additions & 3 deletions ncatbot/adapter/bilibili/api/room_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

from typing import Any
from typing import Any, Optional

from ncatbot.types.bilibili.models import LiveRoomInfo


class RoomManageAPIMixin:
Expand All @@ -23,6 +25,7 @@ async def set_room_silent(self, room_id: int, enable: bool, **kwargs: Any) -> An
)
return await room.del_room_silent()

async def get_room_info(self, room_id: int) -> dict:
async def get_room_info(self, room_id: int) -> Optional[LiveRoomInfo]:
room = self._get_room(room_id)
return await room.get_room_info()
raw = await room.get_room_info()
return LiveRoomInfo.from_raw(raw)
14 changes: 12 additions & 2 deletions ncatbot/adapter/bilibili/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
WatchedChangeEventData,
)
from ncatbot.types.bilibili.enums import BiliLiveEventType, BiliSessionEventType
from ncatbot.types.bilibili.models import LiveRoomInfo
from ncatbot.types.bilibili.sender import BiliSender
from ncatbot.types.common.base import BaseEventData
from ncatbot.types.common.segment.array import MessageArray
Expand Down Expand Up @@ -84,7 +85,13 @@ def _parse_live(self, callback_info: dict) -> Optional[BaseEventData]:

parser = _LIVE_PARSERS.get(cmd)
if parser is not None:
return parser(data, common)
result = parser(data, common)
# 开播事件:附加直播间信息
if isinstance(result, LiveStatusEventData) and result.status == "live":
raw_room_info = callback_info.get("room_info")
if raw_room_info is not None:
result.room_info = LiveRoomInfo.from_raw(raw_room_info)
return result

# 系统事件
if cmd in ("VERIFICATION_SUCCESSFUL", "DISCONNECT", "TIMEOUT"):
Expand Down Expand Up @@ -302,7 +309,10 @@ def _parse_view(data: Any, common: dict) -> ViewEventData:


def _parse_live_status(data: dict, common: dict, status: str) -> LiveStatusEventData:
return LiveStatusEventData(**common, status=status)
live_event_type = (
BiliLiveEventType.LIVE if status == "live" else BiliLiveEventType.PREPARING
)
return LiveStatusEventData(**common, live_event_type=live_event_type, status=status)


def _parse_room_change(data: dict, common: dict) -> RoomChangeEventData:
Expand Down
115 changes: 96 additions & 19 deletions ncatbot/adapter/bilibili/source/live_source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
"""直播间数据源 — 每个实例监听一个直播间的 WebSocket 弹幕流"""
"""直播间数据源 — 每个实例在独立线程中监听一个直播间的 WebSocket 弹幕流

每个 LiveSource 拥有独立的线程和事件循环,避免多房间或
LiveDanmaku 内部阻塞操作影响主事件循环。事件通过
``asyncio.run_coroutine_threadsafe`` 线程安全地回调到主循环。
"""

from __future__ import annotations

import asyncio
import threading
from typing import Any, Awaitable, Callable, Optional

from ncatbot.utils import get_log
Expand Down Expand Up @@ -31,11 +37,18 @@ def __init__(
self._max_retry = max_retry
self._retry_after = retry_after
self._danmaku: Optional["Any"] = None
self._task: Optional[asyncio.Task] = None
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._main_loop: Optional[asyncio.AbstractEventLoop] = None
self._ready = threading.Event()

async def start(self) -> None:
if self._running:
return

self._main_loop = asyncio.get_running_loop()
self._ready.clear()

from bilibili_api.live import LiveDanmaku

self._danmaku = LiveDanmaku(
Expand All @@ -47,37 +60,101 @@ async def start(self) -> None:

@self._danmaku.on("ALL")
async def _on_all(event: dict) -> None:
await self._callback("live", event)
# 开播事件:获取直播间信息附加到事件数据
if event.get("type") == "LIVE":
data = event.get("data", {})
if isinstance(data, dict) and data.get("live_time", 0):
await self._attach_room_info(event)

# 线程安全地将事件回调到主事件循环
if self._main_loop is not None and self._main_loop.is_running():
asyncio.run_coroutine_threadsafe(
self._callback("live", event), self._main_loop
)

self._running = True
self._task = asyncio.create_task(
self._run_danmaku(), name=f"live_source_{self._room_id}"
self._thread = threading.Thread(
target=self._run_thread,
name=f"LiveSource({self._room_id})",
daemon=True,
)
LOG.info("直播源 %s 已启动", self._room_id)
self._thread.start()
LOG.info("直播源 %s 已启动 (线程: %s)", self._room_id, self._thread.name)

def _run_thread(self) -> None:
"""线程入口:创建独立事件循环并运行 LiveDanmaku 连接。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
try:
loop.run_until_complete(self._connect_danmaku())
except Exception:
LOG.exception("直播源 %s 线程异常退出", self._room_id)
finally:
self._running = False
try:
loop.run_until_complete(loop.shutdown_asyncgens())
except Exception:
pass
loop.close()
self._loop = None

async def _attach_room_info(self, event: dict) -> None:
"""在子线程事件循环中获取直播间信息并附加到事件数据。"""
try:
from bilibili_api.live import LiveRoom

room = LiveRoom(
room_display_id=self._room_id,
credential=self._credential,
)
raw_info = await room.get_room_info()
event["room_info"] = raw_info
LOG.info(
"直播源 %s 开播,已获取直播间信息: %s",
self._room_id,
raw_info.get("room_info", {}).get("title", ""),
)
except Exception:
LOG.warning("直播源 %s 获取直播间信息失败", self._room_id, exc_info=True)

async def _connect_danmaku(self) -> None:
"""在独立事件循环中运行弹幕连接。"""

@self._danmaku.on("VERIFICATION_SUCCESSFUL")
async def _on_connected(_: dict) -> None:
self._ready.set()
LOG.debug("直播源 %s WebSocket 验证成功", self._room_id)

async def _run_danmaku(self) -> None:
try:
await self._danmaku.connect()
except asyncio.CancelledError:
pass
except Exception:
LOG.exception("直播源 %s 异常退出", self._room_id)
LOG.exception("直播源 %s 连接异常", self._room_id)
finally:
self._running = False
self._ready.set() # 防止异常时 ready 永远阻塞

async def stop(self) -> None:
if self._danmaku is not None:
if self._danmaku is not None and self._loop is not None:
loop = self._loop
try:
await self._danmaku.disconnect()
future = asyncio.run_coroutine_threadsafe(
self._danmaku.disconnect(), loop
)
future.result(timeout=10)
except Exception:
pass
if self._task is not None and not self._task.done():
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception):
pass
LOG.debug("断开直播源 %s 时异常", self._room_id, exc_info=True)
# 通知子线程事件循环停止
loop.call_soon_threadsafe(loop.stop)

if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=15)
if self._thread.is_alive():
LOG.warning("直播源 %s 线程未能在超时内退出", self._room_id)

self._running = False
self._danmaku = None
self._task = None
self._thread = None
self._loop = None
LOG.info("直播源 %s 已停止", self._room_id)
5 changes: 3 additions & 2 deletions ncatbot/api/bilibili/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from ncatbot.api.base import IAPIClient
from ncatbot.types.bilibili.models import LiveRoomInfo


class IBiliAPIClient(IAPIClient):
Expand Down Expand Up @@ -58,7 +59,7 @@ async def set_room_silent(self, room_id: int, enable: bool, **kwargs: Any) -> An
"""全员禁言"""

@abstractmethod
async def get_room_info(self, room_id: int) -> dict:
async def get_room_info(self, room_id: int) -> Optional[LiveRoomInfo]:
"""获取直播间信息"""

# ---- 私信 ----
Expand Down
8 changes: 8 additions & 0 deletions ncatbot/core/registry/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ def on_like(self, priority: int = 0, **metadata: Any) -> Callable:
"""注册点赞事件 handler"""
return self.on("live.like_info_v3_click", priority=priority, **metadata)

def on_live_start(self, priority: int = 0, **metadata: Any) -> Callable:
"""注册开播事件 handler"""
return self.on("live.live", priority=priority, **metadata)

def on_live_end(self, priority: int = 0, **metadata: Any) -> Callable:
"""注册下播事件 handler"""
return self.on("live.preparing", priority=priority, **metadata)

# ---- 评论事件 ----

def on_comment(self, priority: int = 0, **metadata: Any) -> Callable:
Expand Down
3 changes: 3 additions & 0 deletions ncatbot/types/bilibili/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BiliSessionEventType,
)
from .sender import BiliSender
from .models import LiveRoomInfo
from .events import (
BiliCommentEventData,
BiliConnectionEventData,
Expand Down Expand Up @@ -54,4 +55,6 @@
"BiliCommentEventData",
# system events
"BiliConnectionEventData",
# models
"LiveRoomInfo",
]
4 changes: 4 additions & 0 deletions ncatbot/types/bilibili/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from __future__ import annotations


from typing import Optional

from pydantic import Field

from ncatbot.types.common.base import BaseEventData
from ncatbot.types.common.segment.array import MessageArray
from .enums import BiliPostType, BiliLiveEventType, BiliCommentEventType
from .models import LiveRoomInfo
from .sender import BiliSender

__all__ = [
Expand Down Expand Up @@ -130,6 +133,7 @@ class LiveStatusEventData(BiliLiveEventData):
"""开播/下播"""

status: str = ""
room_info: Optional[LiveRoomInfo] = None


class RoomChangeEventData(BiliLiveEventData):
Expand Down
Loading
Loading