Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,7 @@ class ChatProviderTemplate(TypedDict):
"string": "",
"text": "",
"list": [],
"file": [],
"object": {},
"template_list": [],
}
335 changes: 334 additions & 1 deletion astrbot/dashboard/routes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
from astrbot.core.provider import Provider
from astrbot.core.provider.register import provider_registry
from astrbot.core.star.star import star_registry
from astrbot.core.utils.astrbot_path import (
get_astrbot_plugin_data_path,
get_astrbot_temp_path,
)
from astrbot.core.utils.llm_metadata import LLM_METADATAS
from astrbot.core.utils.io import remove_dir
from astrbot.core.utils.webhook_utils import ensure_platform_webhook_config

from .route import Response, Route, RouteContext

MAX_FILE_BYTES = 500 * 1024 * 1024


def try_cast(value: Any, type_: str):
if type_ == "int":
Expand Down Expand Up @@ -106,6 +113,16 @@ def validate(data: dict, metadata: dict = schema, path=""):
_validate_template_list(value, meta, f"{path}{key}", errors, validate)
continue

if meta["type"] == "file":
if not _expect_type(value, list, f"{path}{key}", errors, "list"):
continue
for idx, item in enumerate(value):
if not isinstance(item, str):
errors.append(
f"Invalid type {path}{key}[{idx}]: expected string, got {type(item).__name__}",
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error messages for file type validation use English while other validation error messages in the same function use Chinese. For consistency, these messages should also use Chinese to match the existing error message style in this codebase.

Suggested change
f"Invalid type {path}{key}[{idx}]: expected string, got {type(item).__name__}",
f"错误的类型 {path}{key}[{idx}]: 期望是 string, 得到了 {type(item).__name__}",

Copilot uses AI. Check for mistakes.
)
continue

if meta["type"] == "list" and not isinstance(value, list):
errors.append(
f"错误的类型 {path}{key}: 期望是 list, 得到了 {type(value).__name__}",
Expand Down Expand Up @@ -218,6 +235,8 @@ def __init__(
"/config/default": ("GET", self.get_default_config),
"/config/astrbot/update": ("POST", self.post_astrbot_configs),
"/config/plugin/update": ("POST", self.post_plugin_configs),
"/config/plugin/file/upload": ("POST", self.upload_plugin_file),
"/config/plugin/file/delete": ("POST", self.delete_plugin_file),
"/config/platform/new": ("POST", self.post_new_platform),
"/config/platform/update": ("POST", self.post_update_platform),
"/config/platform/delete": ("POST", self.post_delete_platform),
Expand Down Expand Up @@ -876,6 +895,313 @@ async def post_plugin_configs(self):
except Exception as e:
return Response().error(str(e)).__dict__


def _get_plugin_metadata_by_name(self, plugin_name: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将新增加的插件文件/配置处理逻辑抽取到独立的辅助模块中,这样路由类可以保持精简,只负责编排 HTTP 流程。

你可以通过把文件/配置相关逻辑集中到一个专门的 helper 中,并让路由类保持“瘦身”,显著降低新增的复杂度。这样既保留所有行为,又把底层细节局部化。

1. 抽取一个 PluginFileConfigManager

将辅助函数和核心逻辑从 AstrBotConfig 中移到一个独立模块,例如 astrbot/core/config/plugin_file_config.py

# astrbot/core/config/plugin_file_config.py

import os
from typing import Iterable
from astrbot.core.utils.astrbot_path import (
    get_astrbot_plugin_data_path,
    get_astrbot_temp_path,
)
from astrbot.core.utils.io import remove_dir


def sanitize_filename(name: str) -> str:
    cleaned = os.path.basename(name).strip()
    if not cleaned or cleaned in {".", ".."}:
        return ""
    for sep in (os.sep, os.altsep):
        if sep:
            cleaned = cleaned.replace(sep, "_")
    return cleaned


def sanitize_path_segment(segment: str) -> str:
    cleaned = []
    for ch in segment:
        if (
            ("a" <= ch <= "z")
            or ("A" <= ch <= "Z")
            or ch.isdigit()
            or ch in {"-", "_"}
        ):
            cleaned.append(ch)
        else:
            cleaned.append("_")
    result = "".join(cleaned).strip("_")
    return result or "_"


def config_key_to_folder(key_path: str) -> str:
    parts = [sanitize_path_segment(p) for p in key_path.split(".") if p]
    return "/".join(parts) if parts else "_"


def normalize_rel_path(rel_path: str | None) -> str | None:
    if not isinstance(rel_path, str):
        return None
    rel = rel_path.replace("\\", "/").lstrip("/")
    if not rel:
        return None
    parts = [p for p in rel.split("/") if p]
    if any(part in {".", ".."} for part in parts):
        return None
    if rel.startswith("../") or "/../" in rel:
        return None
    return "/".join(parts)

将列表/Schema/文件系统相关逻辑也放在同一模块中:

def normalize_file_list(value, key_path: str) -> tuple[list[str], bool]:
    if value is None:
        return [], False
    if not isinstance(value, list):
        raise ValueError(f"Invalid file list for {key_path}")
    folder = config_key_to_folder(key_path)
    expected_prefix = f"files/{folder}/"
    results: list[str] = []
    changed = False
    for item in value:
        if not isinstance(item, str):
            raise ValueError(f"Invalid file entry for {key_path}")
        rel = normalize_rel_path(item)
        if not rel or not rel.startswith("files/"):
            raise ValueError(f"Invalid file path: {item}")
        if rel.startswith(expected_prefix):
            results.append(rel)
            continue
        if rel.count("/") == 1:
            filename = rel.split("/", 1)[1]
            if not filename:
                raise ValueError(f"Invalid file path: {item}")
            results.append(f"{expected_prefix}{filename}")
            changed = True
            continue
        raise ValueError(f"Invalid file path: {item}")
    return results, changed


def apply_plugin_file_ops(plugin_name: str, md, post_configs: dict) -> None:
    schema = getattr(md.config, "schema", None) if md and md.config else None
    if not isinstance(schema, dict):
        return

    # you can also move _collect_file_keys/_get_value_by_path/_set_value_by_path here

    # ... existing logic moved verbatim from AstrBotConfig._apply_plugin_file_ops ...

你也可以将 _collect_file_keys_get_value_by_path_set_value_by_path_get_schema_item 挪到这个模块里,这样所有 Schema/路径相关的知识都集中在一处。

2. 让路由类只负责编排

这样 AstrBotConfig 基本上就只负责编排逻辑,更容易阅读:

# config.py
from astrbot.core.config.plugin_file_config import (
    sanitize_filename,
    normalize_rel_path,
    config_key_to_folder,
    apply_plugin_file_ops,
)

async def upload_plugin_file(self):
    plugin_name = request.args.get("plugin_name")
    key_path = request.args.get("key")
    if not plugin_name or not key_path:
        return Response().error("Missing plugin_name or key parameter").__dict__

    md = self._get_plugin_metadata_by_name(plugin_name)
    if not md or not md.config:
        return Response().error(
            f"Plugin {plugin_name} not found or has no config",
        ).__dict__

    meta = self._get_schema_item(md.config.schema, key_path)
    if not meta or meta.get("type") != "file":
        return Response().error("Config item not found or not file type").__dict__

    file_types = meta.get("file_types")
    allowed_exts = []
    if isinstance(file_types, list):
        allowed_exts = [str(ext).lstrip(".").lower() for ext in file_types if str(ext).strip()]

    files = await request.files
    if not files:
        return Response().error("No files uploaded").__dict__

    staging_root = os.path.join(
        get_astrbot_temp_path(),
        "plugin_file_uploads",
        plugin_name,
    )
    os.makedirs(staging_root, exist_ok=True)

    uploaded = []
    folder = config_key_to_folder(key_path)
    errors = []
    for file in files.values():
        filename = sanitize_filename(file.filename or "")
        # ... unchanged logic using helpers from plugin_file_config ...

    # ... unchanged response construction ...

保存插件配置时:

async def _save_plugin_configs(self, post_configs: dict, plugin_name: str):
    # ... plugin_md lookup unchanged ...
    try:
        errors, post_configs = validate_config(
            post_configs, getattr(md.config, "schema", {}), is_core=False
        )
        if errors:
            raise ValueError(f"格式校验未通过: {errors}")

        apply_plugin_file_ops(plugin_name, md, post_configs)
        md.config.save_config(post_configs)
    except Exception as e:
        raise e

这样既保留了原有行为,又能:

  • 将 Schema/路径/文件相关逻辑集中在一个模块中;
  • 让路由方法专注于 HTTP 相关处理,并委托给辅助模块;
  • 方便未来只在 plugin_file_config.py 中调整文件布局或迁移逻辑。
Original comment in English

issue (complexity): Consider extracting the new plugin file/config handling into a dedicated helper module so the route class stays lean and only orchestrates HTTP flow.

You can reduce the added complexity substantially by pulling the file/config logic into a focused helper and keeping the route class thin. This keeps all behavior but localizes the low‑level concerns.

1. Extract a PluginFileConfigManager

Move the helpers and core logic out of AstrBotConfig into a dedicated module, e.g. astrbot/core/config/plugin_file_config.py:

# astrbot/core/config/plugin_file_config.py

import os
from typing import Iterable
from astrbot.core.utils.astrbot_path import (
    get_astrbot_plugin_data_path,
    get_astrbot_temp_path,
)
from astrbot.core.utils.io import remove_dir


def sanitize_filename(name: str) -> str:
    cleaned = os.path.basename(name).strip()
    if not cleaned or cleaned in {".", ".."}:
        return ""
    for sep in (os.sep, os.altsep):
        if sep:
            cleaned = cleaned.replace(sep, "_")
    return cleaned


def sanitize_path_segment(segment: str) -> str:
    cleaned = []
    for ch in segment:
        if (
            ("a" <= ch <= "z")
            or ("A" <= ch <= "Z")
            or ch.isdigit()
            or ch in {"-", "_"}
        ):
            cleaned.append(ch)
        else:
            cleaned.append("_")
    result = "".join(cleaned).strip("_")
    return result or "_"


def config_key_to_folder(key_path: str) -> str:
    parts = [sanitize_path_segment(p) for p in key_path.split(".") if p]
    return "/".join(parts) if parts else "_"


def normalize_rel_path(rel_path: str | None) -> str | None:
    if not isinstance(rel_path, str):
        return None
    rel = rel_path.replace("\\", "/").lstrip("/")
    if not rel:
        return None
    parts = [p for p in rel.split("/") if p]
    if any(part in {".", ".."} for part in parts):
        return None
    if rel.startswith("../") or "/../" in rel:
        return None
    return "/".join(parts)

Keep the list / schema / fs logic separately in the same module:

def normalize_file_list(value, key_path: str) -> tuple[list[str], bool]:
    if value is None:
        return [], False
    if not isinstance(value, list):
        raise ValueError(f"Invalid file list for {key_path}")
    folder = config_key_to_folder(key_path)
    expected_prefix = f"files/{folder}/"
    results: list[str] = []
    changed = False
    for item in value:
        if not isinstance(item, str):
            raise ValueError(f"Invalid file entry for {key_path}")
        rel = normalize_rel_path(item)
        if not rel or not rel.startswith("files/"):
            raise ValueError(f"Invalid file path: {item}")
        if rel.startswith(expected_prefix):
            results.append(rel)
            continue
        if rel.count("/") == 1:
            filename = rel.split("/", 1)[1]
            if not filename:
                raise ValueError(f"Invalid file path: {item}")
            results.append(f"{expected_prefix}{filename}")
            changed = True
            continue
        raise ValueError(f"Invalid file path: {item}")
    return results, changed


def apply_plugin_file_ops(plugin_name: str, md, post_configs: dict) -> None:
    schema = getattr(md.config, "schema", None) if md and md.config else None
    if not isinstance(schema, dict):
        return

    # you can also move _collect_file_keys/_get_value_by_path/_set_value_by_path here

    # ... existing logic moved verbatim from AstrBotConfig._apply_plugin_file_ops ...

You can also move _collect_file_keys, _get_value_by_path, _set_value_by_path, and _get_schema_item into this module so all schema/path knowledge is in one place.

2. Thin the route class to orchestration only

Then AstrBotConfig becomes mostly orchestration, which is easier to read:

# config.py
from astrbot.core.config.plugin_file_config import (
    sanitize_filename,
    normalize_rel_path,
    config_key_to_folder,
    apply_plugin_file_ops,
)

async def upload_plugin_file(self):
    plugin_name = request.args.get("plugin_name")
    key_path = request.args.get("key")
    if not plugin_name or not key_path:
        return Response().error("Missing plugin_name or key parameter").__dict__

    md = self._get_plugin_metadata_by_name(plugin_name)
    if not md or not md.config:
        return Response().error(
            f"Plugin {plugin_name} not found or has no config",
        ).__dict__

    meta = self._get_schema_item(md.config.schema, key_path)
    if not meta or meta.get("type") != "file":
        return Response().error("Config item not found or not file type").__dict__

    file_types = meta.get("file_types")
    allowed_exts = []
    if isinstance(file_types, list):
        allowed_exts = [str(ext).lstrip(".").lower() for ext in file_types if str(ext).strip()]

    files = await request.files
    if not files:
        return Response().error("No files uploaded").__dict__

    staging_root = os.path.join(
        get_astrbot_temp_path(),
        "plugin_file_uploads",
        plugin_name,
    )
    os.makedirs(staging_root, exist_ok=True)

    uploaded = []
    folder = config_key_to_folder(key_path)
    errors = []
    for file in files.values():
        filename = sanitize_filename(file.filename or "")
        # ... unchanged logic using helpers from plugin_file_config ...

    # ... unchanged response construction ...

And for saving plugin configs:

async def _save_plugin_configs(self, post_configs: dict, plugin_name: str):
    # ... plugin_md lookup unchanged ...
    try:
        errors, post_configs = validate_config(
            post_configs, getattr(md.config, "schema", {}), is_core=False
        )
        if errors:
            raise ValueError(f"格式校验未通过: {errors}")

        apply_plugin_file_ops(plugin_name, md, post_configs)
        md.config.save_config(post_configs)
    except Exception as e:
        raise e

This preserves behavior but:

  • Concentrates schema/path/file logic into one module.
  • Keeps route methods focused on HTTP concerns and delegating to the helper.
  • Makes future changes to file layout or migration logic localized to plugin_file_config.py.

for plugin_md in star_registry:
if plugin_md.name == plugin_name:
return plugin_md
return None

@staticmethod
def _get_schema_item(schema: dict | None, key_path: str) -> dict | None:
if not isinstance(schema, dict) or not key_path:
return None
if key_path in schema:
return schema.get(key_path)

current = schema
parts = key_path.split(".")
for idx, part in enumerate(parts):
if part not in current:
return None
meta = current.get(part)
if idx == len(parts) - 1:
return meta
if not isinstance(meta, dict) or meta.get("type") != "object":
return None
current = meta.get("items", {})
return None

@staticmethod
def _sanitize_filename(name: str) -> str:
cleaned = os.path.basename(name).strip()
if not cleaned or cleaned in {".", ".."}:
return ""
for sep in (os.sep, os.altsep):
if sep:
cleaned = cleaned.replace(sep, "_")
return cleaned

@staticmethod
def _sanitize_path_segment(segment: str) -> str:
cleaned = []
for ch in segment:
if ("a" <= ch <= "z") or ("A" <= ch <= "Z") or ch.isdigit() or ch in {
"-",
"_",
}:
cleaned.append(ch)
else:
cleaned.append("_")
result = "".join(cleaned).strip("_")
return result or "_"

@classmethod
def _config_key_to_folder(cls, key_path: str) -> str:
parts = [cls._sanitize_path_segment(p) for p in key_path.split(".") if p]
return "/".join(parts) if parts else "_"

@staticmethod
def _normalize_rel_path(rel_path: str | None) -> str | None:
if not isinstance(rel_path, str):
return None
rel = rel_path.replace("\\", "/").lstrip("/")
if not rel:
return None
parts = [p for p in rel.split("/") if p]
if any(part in {".", ".."} for part in parts):
return None
if rel.startswith("../") or "/../" in rel:
return None
return "/".join(parts)

@staticmethod
def _get_value_by_path(data: dict, key_path: str):
if key_path in data:
return data.get(key_path)
current = data
for part in key_path.split("."):
if not isinstance(current, dict) or part not in current:
return None
current = current.get(part)
return current

@staticmethod
def _set_value_by_path(data: dict, key_path: str, value) -> None:
if key_path in data:
data[key_path] = value
return
current = data
parts = key_path.split(".")
for part in parts[:-1]:
if part not in current or not isinstance(current[part], dict):
current[part] = {}
current = current[part]
current[parts[-1]] = value

@classmethod
def _collect_file_keys(cls, schema: dict, prefix: str = "") -> list[str]:
keys = []
for key, meta in schema.items():
if not isinstance(meta, dict):
continue
meta_type = meta.get("type")
if meta_type == "file":
keys.append(f"{prefix}{key}" if prefix else key)
elif meta_type == "object":
child_prefix = f"{prefix}{key}." if prefix else f"{key}."
keys.extend(cls._collect_file_keys(meta.get("items", {}), child_prefix))
return keys

def _normalize_file_list(self, value, key_path: str) -> tuple[list[str], bool]:
if value is None:
return [], False
if not isinstance(value, list):
raise ValueError(f"Invalid file list for {key_path}")
folder = self._config_key_to_folder(key_path)
expected_prefix = f"files/{folder}/"
results = []
changed = False
for item in value:
if not isinstance(item, str):
raise ValueError(f"Invalid file entry for {key_path}")
rel = self._normalize_rel_path(item)
if not rel or not rel.startswith("files/"):
raise ValueError(f"Invalid file path: {item}")
if rel.startswith(expected_prefix):
results.append(rel)
continue
if rel.count("/") == 1:
filename = rel.split("/", 1)[1]
if not filename:
raise ValueError(f"Invalid file path: {item}")
results.append(f"{expected_prefix}{filename}")
changed = True
continue
raise ValueError(f"Invalid file path: {item}")
return results, changed

def _apply_plugin_file_ops(self, plugin_name: str, md, post_configs: dict) -> None:
schema = getattr(md.config, "schema", None) if md and md.config else None
if not isinstance(schema, dict):
return

file_keys = self._collect_file_keys(schema)
if not file_keys:
return

old_config = dict(md.config)
new_file_set = set()
old_file_set = set()

for key_path in file_keys:
new_list, new_changed = self._normalize_file_list(
self._get_value_by_path(post_configs, key_path),
key_path,
)
if new_changed:
self._set_value_by_path(post_configs, key_path, new_list)
old_list, _ = self._normalize_file_list(
self._get_value_by_path(old_config, key_path),
key_path,
)
new_file_set.update(new_list)
old_file_set.update(old_list)

plugin_data_dir = os.path.abspath(
os.path.join(get_astrbot_plugin_data_path(), plugin_name),
)
staging_root = os.path.abspath(
os.path.join(get_astrbot_temp_path(), "plugin_file_uploads", plugin_name),
)

for rel_path in sorted(new_file_set):
final_path = os.path.abspath(os.path.join(plugin_data_dir, rel_path))
if not final_path.startswith(plugin_data_dir + os.sep):
raise ValueError(f"Invalid file path: {rel_path}")
staged_path = os.path.abspath(os.path.join(staging_root, rel_path))
if not staged_path.startswith(staging_root + os.sep):
raise ValueError(f"Invalid staged path: {rel_path}")
if os.path.exists(staged_path):
os.makedirs(os.path.dirname(final_path), exist_ok=True)
os.replace(staged_path, final_path)
continue
legacy_path = os.path.join(
plugin_data_dir,
"files",
os.path.basename(rel_path),
)
if os.path.isfile(legacy_path):
os.makedirs(os.path.dirname(final_path), exist_ok=True)
os.replace(legacy_path, final_path)
continue
if not os.path.exists(final_path):
raise ValueError(f"Missing staged file: {rel_path}")
Comment on lines +1079 to +1089
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The legacy file migration logic assumes each file has a unique basename. If multiple file configuration items reference files with the same basename in the legacy "files/" directory, only the first item processed will successfully migrate the file, and subsequent items will fail with "Missing staged file" error. Consider either copying instead of moving legacy files (to allow reuse), or tracking which files have been migrated to avoid the error for duplicate references.

Copilot uses AI. Check for mistakes.

retained = new_file_set
for rel_path in sorted(old_file_set - retained):
final_path = os.path.abspath(os.path.join(plugin_data_dir, rel_path))
if not final_path.startswith(plugin_data_dir + os.sep):
continue
if os.path.isfile(final_path):
os.remove(final_path)
continue
legacy_path = os.path.join(
plugin_data_dir,
"files",
os.path.basename(rel_path),
)
if os.path.isfile(legacy_path):
os.remove(legacy_path)

if os.path.isdir(staging_root):
remove_dir(staging_root)

async def upload_plugin_file(self):
plugin_name = request.args.get("plugin_name")
key_path = request.args.get("key")
if not plugin_name or not key_path:
return Response().error("Missing plugin_name or key parameter").__dict__

md = self._get_plugin_metadata_by_name(plugin_name)
if not md or not md.config:
return Response().error(
f"Plugin {plugin_name} not found or has no config",
).__dict__

meta = self._get_schema_item(md.config.schema, key_path)
if not meta or meta.get("type") != "file":
return Response().error("Config item not found or not file type").__dict__

file_types = meta.get("file_types")
allowed_exts = []
if isinstance(file_types, list):
allowed_exts = [str(ext).lstrip(".").lower() for ext in file_types if str(ext).strip()]

files = await request.files
if not files:
return Response().error("No files uploaded").__dict__

staging_root = os.path.join(
get_astrbot_temp_path(),
"plugin_file_uploads",
plugin_name,
)
os.makedirs(staging_root, exist_ok=True)
Comment on lines +1135 to +1140
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin_name parameter from request args is used directly in path construction without sanitization. While the code validates that the plugin exists in the registry, if a malicious or misconfigured plugin has a name containing path traversal sequences like "../", it could potentially escape the intended directory structure. Consider sanitizing plugin_name similar to how _sanitize_path_segment is used for config keys, or add explicit validation to reject plugin names containing path separators.

Copilot uses AI. Check for mistakes.

uploaded = []
folder = self._config_key_to_folder(key_path)
errors = []
for file in files.values():
filename = self._sanitize_filename(file.filename or "")
if not filename:
errors.append("Invalid filename")
continue

file_size = getattr(file, "content_length", None)
if isinstance(file_size, int) and file_size > MAX_FILE_BYTES:
errors.append(f"File too large: {filename}")
continue

ext = os.path.splitext(filename)[1].lstrip(".").lower()
if allowed_exts and ext not in allowed_exts:
errors.append(f"Unsupported file type: {filename}")
continue

rel_path = f"files/{folder}/{filename}"
save_path = os.path.join(staging_root, rel_path)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
await file.save(save_path)
if os.path.isfile(save_path) and os.path.getsize(save_path) > MAX_FILE_BYTES:
os.remove(save_path)
errors.append(f"File too large: {filename}")
continue
uploaded.append(rel_path)

if not uploaded:
return Response().error(
"Upload failed: " + ", ".join(errors) if errors else "Upload failed",
).__dict__

return Response().ok({"uploaded": uploaded, "errors": errors}).__dict__

async def delete_plugin_file(self):
"""Delete a staged upload under temp; final deletion happens on config save."""
plugin_name = request.args.get("plugin_name")
if not plugin_name:
return Response().error("Missing plugin_name parameter").__dict__

data = await request.get_json()
rel_path = data.get("path") if isinstance(data, dict) else None
rel_path = self._normalize_rel_path(rel_path)
if not rel_path or not rel_path.startswith("files/"):
return Response().error("Invalid path parameter").__dict__

md = self._get_plugin_metadata_by_name(plugin_name)
if not md:
return Response().error(f"Plugin {plugin_name} not found").__dict__

staging_root = os.path.abspath(
os.path.join(get_astrbot_temp_path(), "plugin_file_uploads", plugin_name),
)
staged_path = os.path.abspath(
os.path.normpath(os.path.join(staging_root, rel_path)),
)
if staged_path.startswith(staging_root + os.sep) and os.path.isfile(staged_path):
os.remove(staged_path)

return Response().ok(None, "Deletion staged").__dict__

async def post_new_platform(self):
new_platform_config = await request.json

Expand Down Expand Up @@ -1132,6 +1458,13 @@ async def _save_plugin_configs(self, post_configs: dict, plugin_name: str):
raise ValueError(f"插件 {plugin_name} 没有注册配置")

try:
save_config(post_configs, md.config)
errors, post_configs = validate_config(
post_configs, getattr(md.config, "schema", {}), is_core=False
)
if errors:
raise ValueError(f"格式校验未通过: {errors}")

self._apply_plugin_file_ops(plugin_name, md, post_configs)
md.config.save_config(post_configs)
except Exception as e:
raise e
Loading