-
-
Notifications
You must be signed in to change notification settings - Fork 22
feat(uniseg): 添加云湖平台适配器支持 #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
molanp
wants to merge
3
commits into
nonebot:master
Choose a base branch
from
molanp:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+332
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
17 changes: 17 additions & 0 deletions
17
src/nonebot_plugin_alconna/uniseg/adapters/yunhu/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter | ||
| from nonebot_plugin_alconna.uniseg.loader import BaseLoader | ||
|
|
||
|
|
||
| class Loader(BaseLoader): | ||
| def get_adapter(self) -> SupportAdapter: | ||
| return SupportAdapter.yunhu | ||
|
|
||
| def get_builder(self): | ||
| from .builder import YunHuMessageBuilder | ||
|
|
||
| return YunHuMessageBuilder() | ||
|
|
||
| def get_exporter(self): | ||
| from .exporter import YunHuMessageExporter | ||
|
|
||
| return YunHuMessageExporter() |
109 changes: 109 additions & 0 deletions
109
src/nonebot_plugin_alconna/uniseg/adapters/yunhu/builder.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from nonebot.adapters import Bot, Event | ||
| from nonebot.adapters.yunhu.event import MessageEvent | ||
| from nonebot.adapters.yunhu.message import At as AtSegment | ||
| from nonebot.adapters.yunhu.message import Buttons as ButtonsSegment | ||
| from nonebot.adapters.yunhu.message import Face as FaceSegment | ||
| from nonebot.adapters.yunhu.message import File as FileSegment | ||
| from nonebot.adapters.yunhu.message import Html as HtmlSegment | ||
| from nonebot.adapters.yunhu.message import Image as ImageSegment | ||
| from nonebot.adapters.yunhu.message import Markdown as MarkdownSegment | ||
| from nonebot.adapters.yunhu.message import Message | ||
| from nonebot.adapters.yunhu.message import Text as TextSegment | ||
| from nonebot.adapters.yunhu.message import Video as VideoSegment | ||
| from nonebot.adapters.yunhu.models import Reply as ReplySegement | ||
|
|
||
| from nonebot_plugin_alconna.uniseg.builder import MessageBuilder, build | ||
| from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter | ||
| from nonebot_plugin_alconna.uniseg.segment import At, Button, Emoji, File, Image, Keyboard, Reply, Text, Video | ||
|
|
||
|
|
||
| class YunHuMessageBuilder(MessageBuilder): | ||
| @classmethod | ||
| def get_adapter(cls) -> SupportAdapter: | ||
| return SupportAdapter.yunhu | ||
|
|
||
| @build("text") | ||
| def text(self, seg: TextSegment): | ||
| return Text(seg.data["text"]) | ||
|
|
||
| @build("markdown") | ||
| def markdown(self, seg: MarkdownSegment): | ||
| content = seg.data["text"] | ||
| return Text(content).mark(0, len(content), "markdown") | ||
|
|
||
| @build("html") | ||
| def html(self, seg: HtmlSegment): | ||
| content = seg.data["text"] | ||
| return Text(content).mark(0, len(content), "html") | ||
|
|
||
| @build("at") | ||
| def at(self, seg: AtSegment): | ||
| return At("user", seg.data["user_id"], seg.data["name"]) | ||
|
|
||
| @build("face") | ||
| def face(self, seg: FaceSegment): | ||
| return Emoji(seg.data["code"], seg.data["emoji"]) | ||
|
|
||
| @build("image") | ||
| def image(self, seg: ImageSegment): | ||
|
|
||
| if seg.data["url"]: | ||
| return Image(id=seg.data["imageKey"], url=seg.data["url"]) | ||
| if seg.data["raw"]: | ||
| return Image(id=seg.data["imageKey"], raw=seg.data["raw"]) | ||
| return Image(id=seg.data["imageKey"]) | ||
|
|
||
| @build("video") | ||
| def video(self, seg: VideoSegment): | ||
| if seg.data["url"]: | ||
| return Video(id=seg.data["videoKey"], url=seg.data["url"]) | ||
| if seg.data["raw"]: | ||
| return Video(id=seg.data["videoKey"], raw=seg.data["raw"]) | ||
| return Video(id=seg.data["videoKey"]) | ||
|
|
||
| @build("file") | ||
| def file(self, seg: FileSegment): | ||
| if seg.data["url"]: | ||
| return File(id=seg.data["fileKey"], url=seg.data["url"]) | ||
| if seg.data["raw"]: | ||
| return File(id=seg.data["fileKey"], raw=seg.data["raw"]) | ||
| return File(id=seg.data["fileKey"]) | ||
|
|
||
| @build("keyboard") | ||
| def keyboard(self, seg: ButtonsSegment): | ||
| kbs = [] | ||
| btns = seg.data["buttons"] | ||
| for kb in btns: | ||
| buttons = [] | ||
| for button in kb: | ||
| if button["actionType"] == 1: | ||
| flag = "link" | ||
| elif button["actionType"] == 2: | ||
| flag = "input" | ||
| elif button["actionType"] == 3: | ||
| flag = "action" | ||
| buttons.append( | ||
| Button( | ||
| flag=flag, | ||
| label=button["text"], | ||
| url=button["url"] if button["actionType"] == 1 else None, | ||
| text=button["value"] if button["actionType"] == 2 else None, | ||
| ) | ||
| ) | ||
| kbs.append(Keyboard(buttons=buttons)) | ||
| return kbs | ||
|
|
||
| async def extract_reply(self, event: Event, bot: Bot): | ||
| if TYPE_CHECKING: | ||
| assert isinstance(event, MessageEvent) | ||
| if rpl := event.reply: | ||
| if TYPE_CHECKING: | ||
| assert isinstance(rpl, ReplySegement) | ||
| return Reply( | ||
| rpl.msgId, | ||
| Message.deserialize(rpl.content, rpl.content.at, rpl.contentType, rpl.commandName), | ||
| rpl, | ||
| ) | ||
| return None |
201 changes: 201 additions & 0 deletions
201
src/nonebot_plugin_alconna/uniseg/adapters/yunhu/exporter.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any, Union, cast | ||
|
|
||
| from nonebot.adapters import Bot, Event | ||
| from nonebot.adapters.yunhu.bot import Bot as YunHuBot | ||
| from nonebot.adapters.yunhu.event import Event as YunHuEvent | ||
| from nonebot.adapters.yunhu.event import MessageEvent, NoticeEvent | ||
| from nonebot.adapters.yunhu.message import Message, MessageSegment | ||
| from nonebot.adapters.yunhu.models import BaseNotice, ButtonBody, SendMsgResponse | ||
| from tarina import lang | ||
|
|
||
| from nonebot_plugin_alconna.uniseg.constraint import SupportScope | ||
| from nonebot_plugin_alconna.uniseg.exporter import MessageExporter, SerializeFailed, SupportAdapter, Target, export | ||
| from nonebot_plugin_alconna.uniseg.segment import At, Button, Emoji, File, Image, Keyboard, Reply, Text, Video | ||
|
|
||
|
|
||
| class YunHuMessageExporter(MessageExporter[Message]): | ||
| @classmethod | ||
| def get_adapter(cls) -> SupportAdapter: | ||
| return SupportAdapter.yunhu | ||
|
|
||
| def get_message_type(self): | ||
| return Message | ||
|
|
||
| def get_target(self, event: Event, bot: Union[Bot, None] = None) -> Target: | ||
| if isinstance(event, MessageEvent): | ||
| return Target( | ||
| event.event.sender.senderId, | ||
| event.event.chat.chatId if event.event.chat.chatType == "group" else "", | ||
| private=(event.event.chat.chatType == "bot"), | ||
| source=event.event.message.msgId, | ||
| adapter=self.get_adapter(), | ||
| self_id=bot.self_id if bot else None, | ||
| scope=SupportScope.yunhu, | ||
| ) | ||
| if isinstance(event, NoticeEvent): | ||
| if TYPE_CHECKING: | ||
| assert isinstance(event.event, BaseNotice) | ||
| return Target( | ||
| event.get_user_id(), | ||
| event.event.chatId, | ||
| private=(event.event.chatType == "user"), | ||
| source=event.header.eventId, | ||
| adapter=self.get_adapter(), | ||
| self_id=bot.self_id if bot else None, | ||
| scope=SupportScope.yunhu, | ||
| ) | ||
| raise NotImplementedError | ||
|
|
||
| def get_message_id(self, event: Event) -> str: | ||
| assert isinstance( | ||
| event, | ||
| MessageEvent, | ||
| ) | ||
| return event.event.message.msgId | ||
|
|
||
| @export | ||
| async def text(self, seg: Text, bot: Union[Bot, None]) -> "MessageSegment": | ||
| if seg.extract_most_style() == "markdown": | ||
| return MessageSegment.markdown(seg.text) | ||
| if seg.extract_most_style() == "html": | ||
| return MessageSegment.html(seg.text) | ||
| if seg.styles: | ||
| return MessageSegment.markdown(str(seg)) | ||
| return MessageSegment.text(seg.text) | ||
|
|
||
| @export | ||
| async def at(self, seg: At, bot: Union[Bot, None]) -> "MessageSegment": | ||
| if seg.flag == "user": | ||
| return MessageSegment.at(seg.target, seg.display or "") | ||
| raise SerializeFailed( | ||
| lang.require("nbp-uniseg", "failed_segment").format(adapter="yunhu", seg=seg, target="at") | ||
| ) | ||
|
|
||
| @export | ||
| async def face(self, seg: Emoji, bot: Union[Bot, None]) -> "MessageSegment": | ||
| return MessageSegment.face(seg.id, seg.name or "") | ||
|
|
||
| @export | ||
| async def media(self, seg: Union[Image, Video, File], bot: Union[Bot, None]) -> "MessageSegment": | ||
| name = seg.__class__.__name__.lower() | ||
| method = { | ||
| "image": MessageSegment.image, | ||
| "video": MessageSegment.video, | ||
| "file": MessageSegment.file, | ||
| }[name] | ||
|
|
||
| if seg.url: | ||
| return method(url=seg.url) | ||
| if seg.raw: | ||
| return method(raw=seg.raw_bytes) | ||
| if seg.path: | ||
| return method(raw=Path(seg.path).read_bytes()) | ||
| raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="image", seg=seg)) | ||
|
|
||
| @export | ||
| async def reply(self, seg: Reply, bot: Union[Bot, None]) -> "MessageSegment": | ||
| return MessageSegment("$yunhu:reply", {"message_id": seg.id}) | ||
|
|
||
| def _button(self, seg: Button, bot: Union[Bot, None]) -> ButtonBody: | ||
| label = str(seg.label) | ||
| if seg.flag == "link": | ||
| return {"text": label, "actionType": 1, "url": seg.url} # pyright: ignore[reportReturnType] | ||
| if seg.flag == "action": | ||
| return {"text": label, "actionType": 3} # pyright: ignore[reportReturnType] | ||
| return {"text": label, "actionType": 2, "value": seg.text} # pyright: ignore[reportReturnType] | ||
|
|
||
| @export | ||
| async def button(self, seg: Button, bot: Union[Bot, None]): | ||
| return MessageSegment("$yunhu:button", {"button": self._button(seg, bot)}) | ||
|
|
||
| @export | ||
| async def keyboard(self, seg: Keyboard, bot: Union[Bot, None]): | ||
| if not seg.children: | ||
| raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="keyboard", seg=seg)) | ||
| buttons = [self._button(but, bot) for but in seg.children] | ||
| if not seg.row: | ||
| return MessageSegment("$yunhu:button_row", {"buttons": buttons}) | ||
| rows = [buttons[i : i + (seg.row or 9)] for i in range(0, len(buttons), seg.row or 9)] | ||
| return MessageSegment("$yunhu:keyboard", {"buttons": rows}) | ||
|
|
||
| async def send_to(self, target: Union[Target, YunHuEvent], bot: Bot, message: Message, **kwargs): | ||
| assert isinstance(bot, YunHuBot) | ||
| if TYPE_CHECKING: | ||
| assert isinstance(message, self.get_message_type()) | ||
|
|
||
| kb = None | ||
| message_id: str | None = None | ||
|
|
||
| # 处理 button | ||
| if buttons := message.get("$yunhu:button"): | ||
| message = message.exclude("$yunhu:button") | ||
| buts = [but.data["button"] for but in buttons] | ||
| kb = [buts[i : i + 9] for i in range(0, len(buts), 9)] | ||
|
|
||
| # 处理 button_row | ||
| if rows := message.get("$yunhu:button_row"): | ||
| message = message.exclude("$yunhu:button_row") | ||
| but_rows = [row.data["buttons"] for row in rows] | ||
| if not kb: | ||
| kb = but_rows | ||
| else: | ||
| kb.extend(but_rows) | ||
|
|
||
| # 处理 keyboard | ||
| if keyboard := message.get("$yunhu:keyboard"): | ||
| message = message.exclude("$yunhu:keyboard") | ||
| keyboard_buttons = keyboard[0].data["buttons"] | ||
| if not kb: | ||
| kb = keyboard_buttons | ||
| else: | ||
| kb.extend(keyboard_buttons) | ||
|
|
||
| if kb: | ||
| message.append(MessageSegment.buttons(kb)) | ||
|
|
||
| # 处理 reply | ||
| if reply_segments := message.get("$yunhu:reply"): | ||
| message = message.exclude("$yunhu:reply") | ||
| raw_inner_id = reply_segments[0].data.get("message_id") | ||
| message_id = str(raw_inner_id) if raw_inner_id else None | ||
| else: | ||
| message_id = None | ||
|
|
||
| if isinstance(target, YunHuEvent): | ||
| if message_id: | ||
| return await bot.send(event=target, message=message, reply_to=message_id) | ||
| return await bot.send(event=target, message=message) | ||
|
|
||
| content, content_type = message.serialize() | ||
| return await bot.send_msg( | ||
| receive_type=("user" if target.private else "group"), | ||
| receive_id=target.id, | ||
| content=content, | ||
| content_type=content_type, | ||
| parent_id=message_id, | ||
| ) | ||
|
|
||
| async def recall(self, mid: Any, bot: Bot, context: Union[Target, Event]): | ||
| assert isinstance(bot, YunHuBot) | ||
| if isinstance(mid, (str, int)) and isinstance(context, MessageEvent): | ||
| if context.event.message.chatType == "bot": | ||
| chat_id = context.event.sender.senderId | ||
| chat_type = "user" | ||
| else: | ||
| chat_id = context.event.message.chatId | ||
| chat_type = "group" | ||
| await bot.delete_msg(message_id=str(mid), chat_id=chat_id, chat_type=chat_type) | ||
| else: | ||
| _mid: SendMsgResponse = cast(SendMsgResponse, mid) | ||
| assert _mid.data | ||
| await bot.delete_msg( | ||
| message_id=_mid.data.messageInfo.msgId, | ||
| chat_id=_mid.data.messageInfo.recvId, | ||
| chat_type=_mid.data.messageInfo.recvType, | ||
| ) | ||
|
|
||
| def get_reply(self, mid: Any): | ||
| if isinstance(mid, MessageEvent): | ||
| return Reply(mid.event.message.msgId) | ||
| raise NotImplementedError | ||
molanp marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.