Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class ChatProviderTemplate(TypedDict):
"enable": False,
"client_id": "",
"client_secret": "",
"card_template_id": "",
},
"Telegram": {
"id": "telegram",
Expand Down Expand Up @@ -582,6 +583,11 @@ class ChatProviderTemplate(TypedDict):
"type": "string",
"hint": "可选:填写 Misskey 网盘中目标文件夹的 ID,上传的文件将放置到该文件夹内。留空则使用账号网盘根目录。",
},
"card_template_id": {
"description": "卡片模板 ID",
"type": "string",
"hint": "可选。钉钉互动卡片模板 ID。启用后将使用互动卡片进行流式回复。",
},
"telegram_command_register": {
"description": "Telegram 命令注册",
"type": "bool",
Expand Down
63 changes: 61 additions & 2 deletions astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def process(self, event: dingtalk_stream.EventMessage):


@register_platform_adapter(
"dingtalk", "钉钉机器人官方 API 适配器", support_streaming_message=False
"dingtalk", "钉钉机器人官方 API 适配器", support_streaming_message=True
)
class DingtalkPlatformAdapter(Platform):
def __init__(
Expand Down Expand Up @@ -75,6 +75,8 @@ async def process(self, message: dingtalk_stream.CallbackMessage):
)
self.client_ = client # 用于 websockets 的 client
self._shutdown_event: threading.Event | None = None
self.card_template_id = platform_config.get("card_template_id")
self.card_instance_id_dict = {}

def _id_to_sid(self, dingtalk_id: str | None) -> str:
if not dingtalk_id:
Expand All @@ -96,9 +98,65 @@ def meta(self) -> PlatformMetadata:
name="dingtalk",
description="钉钉机器人官方 API 适配器",
id=cast(str, self.config.get("id")),
support_streaming_message=False,
support_streaming_message=True,
)

async def create_message_card(
self, message_id: str, incoming_message: dingtalk_stream.ChatbotMessage
):
if not self.card_template_id:
return False

card_instance = dingtalk_stream.AICardReplier(self.client_, incoming_message)
card_data = {"content": ""} # Initial content empty

try:
card_instance_id = await card_instance.async_create_and_deliver_card(
self.card_template_id,
card_data,
)
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
return True
except Exception as e:
logger.error(f"创建钉钉卡片失败: {e}")
return False

async def send_card_message(self, message_id: str, content: str, is_final: bool):
if message_id not in self.card_instance_id_dict:
return

card_instance, card_instance_id = self.card_instance_id_dict[message_id]
content_key = "content"

try:
# 钉钉卡片流式更新

await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
content_value=content,
append=False,
finished=is_final,
failed=False,
)
except Exception as e:
logger.error(f"发送钉钉卡片消息失败: {e}")
# Try to report failure
try:
await card_instance.async_streaming(
card_instance_id,
content_key=content_key,
content_value=content, # Keep existing content
append=False,
finished=True,
failed=True,
)
except Exception:
pass

if is_final:
self.card_instance_id_dict.pop(message_id, None)

async def convert_msg(
self,
message: dingtalk_stream.ChatbotMessage,
Expand Down Expand Up @@ -224,6 +282,7 @@ async def handle_msg(self, abm: AstrBotMessage):
platform_meta=self.meta(),
session_id=abm.session_id,
client=self.client,
adapter=self,
)

self._event_queue.put_nowait(event)
Expand Down
68 changes: 57 additions & 11 deletions astrbot/core/platform/sources/dingtalk/dingtalk_event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import cast
from typing import Any, cast

import dingtalk_stream

Expand All @@ -16,9 +16,11 @@ def __init__(
platform_meta,
session_id,
client: dingtalk_stream.ChatbotHandler,
adapter: "Any" = None,
):
super().__init__(message_str, message_obj, platform_meta, session_id)
self.client = client
self.adapter = adapter

async def send_with_client(
self,
Expand Down Expand Up @@ -83,14 +85,58 @@ async def send(self, message: MessageChain):
await super().send(message)

async def send_streaming(self, generator, use_fallback: bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将 send_streaming 中共享的缓冲回退逻辑和卡片流式逻辑抽取为独立的辅助方法,以简化主方法的控制流并减少重复。

你可以通过将通用的“缓冲并发送”路径和卡片流式逻辑抽取到几个小的辅助方法中,来降低 send_streaming 的复杂度和重复度。这样既能保留当前所有行为,又能让控制流更清晰、更易维护。

例如:

class DingtalkMessageEvent(AstrMessageEvent):
    ...

    async def _buffer_and_send_fallback(self, generator, use_fallback: bool):
        buffer = None
        async for chain in generator:
            if buffer is None:
                buffer = chain
            else:
                buffer.chain.extend(chain.chain)

        if buffer is None:
            return None

        buffer.squash_plain()
        await self.send(buffer)
        return await super().send_streaming(generator, use_fallback)

    def _can_use_card_streaming(self) -> bool:
        return bool(self.adapter and self.adapter.card_template_id)

    async def _stream_to_card(self, generator, msg_id: str, incoming_msg: Any):
        created = await self.adapter.create_message_card(msg_id, incoming_msg)
        if not created:
            return None  # caller decides how to fallback

        full_content = ""
        seq = 0
        try:
            async for chain in generator:
                for segment in chain.chain:
                    if isinstance(segment, Comp.Plain):
                        full_content += segment.text

                seq += 1
                if seq % 2 == 0:
                    await self.adapter.send_card_message(
                        msg_id, full_content, is_final=False
                    )

            await self.adapter.send_card_message(msg_id, full_content, is_final=True)
        except Exception as e:
            logger.error(f"DingTalk streaming error: {e}")
            await self.adapter.send_card_message(msg_id, full_content, is_final=True)

然后 send_streaming 就可以简化为一个较短的调度方法:

    async def send_streaming(self, generator, use_fallback: bool = False):
        if not self._can_use_card_streaming():
            logger.warning(
                "DingTalk streaming is enabled, but 'card_template_id' is not "
                f"configured for platform '{self.platform_meta.id}'. "
                "Falling back to text streaming."
            )
            return await self._buffer_and_send_fallback(generator, use_fallback)

        msg_id = self.message_obj.message_id
        incoming_msg = self.message_obj.raw_message

        # try card streaming
        result = await self._stream_to_card(generator, msg_id, incoming_msg)
        if result is None:
            # card creation failed -> fallback to original behavior
            return await self._buffer_and_send_fallback(generator, use_fallback)

        return result

这样可以去除重复的缓冲逻辑,将卡片相关的细节隔离出来,并让 send_streaming 专注于高层决策,同时不改变任何现有功能。

Original comment in English

issue (complexity): Consider extracting the shared buffering fallback and card-streaming logic in send_streaming into separate helper methods to simplify the main method’s control flow and reduce duplication.

You can reduce the complexity and duplication in send_streaming by extracting the common “buffer and send” path and the card-streaming logic into small helpers. This keeps all current behavior but makes control flow clearer and easier to maintain.

For example:

class DingtalkMessageEvent(AstrMessageEvent):
    ...

    async def _buffer_and_send_fallback(self, generator, use_fallback: bool):
        buffer = None
        async for chain in generator:
            if buffer is None:
                buffer = chain
            else:
                buffer.chain.extend(chain.chain)

        if buffer is None:
            return None

        buffer.squash_plain()
        await self.send(buffer)
        return await super().send_streaming(generator, use_fallback)

    def _can_use_card_streaming(self) -> bool:
        return bool(self.adapter and self.adapter.card_template_id)

    async def _stream_to_card(self, generator, msg_id: str, incoming_msg: Any):
        created = await self.adapter.create_message_card(msg_id, incoming_msg)
        if not created:
            return None  # caller decides how to fallback

        full_content = ""
        seq = 0
        try:
            async for chain in generator:
                for segment in chain.chain:
                    if isinstance(segment, Comp.Plain):
                        full_content += segment.text

                seq += 1
                if seq % 2 == 0:
                    await self.adapter.send_card_message(
                        msg_id, full_content, is_final=False
                    )

            await self.adapter.send_card_message(msg_id, full_content, is_final=True)
        except Exception as e:
            logger.error(f"DingTalk streaming error: {e}")
            await self.adapter.send_card_message(msg_id, full_content, is_final=True)

Then send_streaming becomes a short orchestration method:

    async def send_streaming(self, generator, use_fallback: bool = False):
        if not self._can_use_card_streaming():
            logger.warning(
                "DingTalk streaming is enabled, but 'card_template_id' is not "
                f"configured for platform '{self.platform_meta.id}'. "
                "Falling back to text streaming."
            )
            return await self._buffer_and_send_fallback(generator, use_fallback)

        msg_id = self.message_obj.message_id
        incoming_msg = self.message_obj.raw_message

        # try card streaming
        result = await self._stream_to_card(generator, msg_id, incoming_msg)
        if result is None:
            # card creation failed -> fallback to original behavior
            return await self._buffer_and_send_fallback(generator, use_fallback)

        return result

This removes the duplicated buffering logic, isolates the card-specific details, and keeps send_streaming focused on high-level decision-making without changing any functionality.

buffer = None
async for chain in generator:
if not self.adapter or not self.adapter.card_template_id:
logger.warning(
f"DingTalk streaming is enabled, but 'card_template_id' is not configured for platform '{self.platform_meta.id}'. Falling back to text streaming."
)
# Fallback to default behavior (buffer and send)
buffer = None
async for chain in generator:
if not buffer:
buffer = chain
else:
buffer.chain.extend(chain.chain)
if not buffer:
return None
buffer.squash_plain()
await self.send(buffer)
return await super().send_streaming(generator, use_fallback)

# Create card
msg_id = self.message_obj.message_id
incoming_msg = self.message_obj.raw_message
created = await self.adapter.create_message_card(msg_id, incoming_msg)

if not created:
# Fallback to default behavior (buffer and send)
buffer = None
async for chain in generator:
if not buffer:
buffer = chain
else:
buffer.chain.extend(chain.chain)
if not buffer:
buffer = chain
else:
buffer.chain.extend(chain.chain)
if not buffer:
return None
buffer.squash_plain()
await self.send(buffer)
return await super().send_streaming(generator, use_fallback)
return None
buffer.squash_plain()
await self.send(buffer)
return await super().send_streaming(generator, use_fallback)

full_content = ""
seq = 0
try:
async for chain in generator:
for segment in chain.chain:
if isinstance(segment, Comp.Plain):
full_content += segment.text
Comment on lines +128 to +130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 目前卡片流式发送中只包含 Plain 段落,这可能会导致其他类型内容被丢弃。

在这条流式处理路径中,只有 Comp.Plain 段被追加到 full_content,因此 markdown、@提及、图片等内容会从流式卡片中被丢弃,但在非流式或回退流程中仍然存在。如果钉钉卡片支持更丰富的内容类型,建议将更多的 segment 类型映射到卡片字段,以便保持流式与非流式行为的一致性。

Original comment in English

issue (bug_risk): Only Plain segments are included in card streaming, which may drop other content types.

In this streaming path, only Comp.Plain segments are appended to full_content, so markdown, mentions, images, etc. will be dropped from the streamed card while still appearing in non-streaming or fallback flows. If DingTalk cards support richer content, consider mapping additional segment types to card fields so streamed vs non-streamed behavior stays aligned.


seq += 1
if seq % 2 == 0: # Update every 2 chunks to be more responsive than 8
await self.adapter.send_card_message(
msg_id, full_content, is_final=False
)

await self.adapter.send_card_message(msg_id, full_content, is_final=True)
except Exception as e:
logger.error(f"DingTalk streaming error: {e}")
# Try to ensure final state is sent or cleaned up?
await self.adapter.send_card_message(msg_id, full_content, is_final=True)