Files
MoviePilot/app/core/plugin.py

1710 lines
67 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import ast
import asyncio
import concurrent
import concurrent.futures
import importlib.util
import inspect
import os
import sys
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple
from fastapi import HTTPException
from starlette import status
from watchfiles import watch
from app import schemas
from app.core.cache import fresh, async_fresh
from app.core.config import settings
from app.core.event import eventmanager
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.mixins import ConfigReloadMixin
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
class PluginManager(ConfigReloadMixin, metaclass=Singleton):
"""插件管理器"""
CONFIG_WATCH = {"DEV", "PLUGIN_AUTO_RELOAD"}
def __init__(self):
# 插件列表
self._plugins: dict = {}
# 运行态插件列表
self._running_plugins: dict = {}
# 配置Key
self._config_key: str = "plugin.%s"
# 监控线程
self._monitor_thread: Optional[threading.Thread] = None
# 监控停止事件
self._stop_monitor_event = threading.Event()
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()
def init_config(self):
# 停止已有插件
self.stop()
# 启动插件
self.start()
def start(self, pid: Optional[str] = None):
"""
启动加载插件
:param pid: 插件ID为空加载所有插件
"""
def check_module(module: Any):
"""
检查模块
"""
if not hasattr(module, 'init_plugin') or not hasattr(module, "plugin_name"):
return False
return True
# 已安装插件
installed_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 扫描插件目录,只加载符合条件的插件
plugins = self._load_selective_plugins(pid, installed_plugins, check_module)
# 排序
plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0)
for plugin in plugins:
plugin_id = plugin.__name__
if pid and plugin_id != pid:
continue
try:
# 判断插件是否满足认证要求,如不满足则不进行实例化
if not self.__set_and_check_auth_level(plugin=plugin):
# 如果是插件热更新实例,这里则进行替换
if plugin_id in self._plugins:
self._plugins[plugin_id] = plugin
continue
# 存储Class
self._plugins[plugin_id] = plugin
# 生成实例
plugin_obj = plugin()
# 生效插件配置
plugin_obj.init_plugin(self.get_plugin_config(plugin_id))
# 存储运行实例
self._running_plugins[plugin_id] = plugin_obj
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
# 启用的插件才设置事件注册状态可用
if plugin_obj.get_state():
eventmanager.enable_event_handler(plugin)
else:
eventmanager.disable_event_handler(plugin)
except Exception as err:
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
def init_plugin(self, plugin_id: str, conf: dict):
"""
初始化插件
:param plugin_id: 插件ID
:param conf: 插件配置
"""
plugin = self._running_plugins.get(plugin_id)
if not plugin:
return
# 初始化插件
plugin.init_plugin(conf)
# 检查插件状态并启用/禁用事件处理器
if plugin.get_state():
# 启用插件类的事件处理器
eventmanager.enable_event_handler(type(plugin))
else:
# 禁用插件类的事件处理器
eventmanager.disable_event_handler(type(plugin))
def stop(self, pid: Optional[str] = None):
"""
停止插件服务
:param pid: 插件ID为空停止所有插件
"""
# 停止插件
if pid:
logger.info(f"正在停止插件 {pid}...")
plugin_obj = self._running_plugins.get(pid)
if not plugin_obj:
logger.debug(f"插件 {pid} 不存在或未加载")
return
plugins = {pid: plugin_obj}
else:
logger.info("正在停止所有插件...")
plugins = self._running_plugins
for plugin_id, plugin in plugins.items():
eventmanager.disable_event_handler(type(plugin))
self.__stop_plugin(plugin)
# 清空对像
if pid:
# 清空指定插件
self._plugins.pop(pid, None)
self._running_plugins.pop(pid, None)
# 清除插件模块缓存,包括所有子模块
self._clear_plugin_modules(pid)
else:
# 清空
self._plugins = {}
self._running_plugins = {}
# 清除所有插件模块缓存
self._clear_plugin_modules()
logger.info("插件停止完成")
@staticmethod
def _load_selective_plugins(pid: Optional[str], installed_plugins: List[str],
check_module_func: Callable) -> List[Any]:
"""
选择性加载插件只import符合条件的插件
:param pid: 指定插件ID为空则加载所有已安装插件
:param installed_plugins: 已安装插件列表
:param check_module_func: 模块检查函数
:return: 插件类列表
"""
import importlib
plugins = []
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
if not plugins_dir.exists():
logger.warning(f"插件目录不存在:{plugins_dir}")
return plugins
# 确定需要加载的插件目录名称列表
if pid:
# 加载指定插件
target_plugins = [pid.lower()]
else:
# 加载已安装插件
target_plugins = [plugin_id.lower() for plugin_id in installed_plugins]
if not target_plugins:
logger.debug("没有需要加载的插件")
return plugins
# 扫描plugins目录
_loaded_modules = set()
for plugin_dir in plugins_dir.iterdir():
if not plugin_dir.is_dir() or plugin_dir.name.startswith('_'):
continue
# 检查是否是需要加载的插件
if plugin_dir.name not in target_plugins:
logger.debug(f"跳过插件目录:{plugin_dir.name}(不在加载列表中)")
continue
# 检查__init__.py是否存在
init_file = plugin_dir / "__init__.py"
if not init_file.exists():
logger.debug(f"跳过插件目录:{plugin_dir.name}缺少__init__.py")
continue
try:
# 构建模块名
module_name = f"app.plugins.{plugin_dir.name}"
logger.debug(f"正在导入插件模块:{module_name}")
# 导入模块
module = importlib.import_module(module_name)
# 检查模块中的类
for name, obj in module.__dict__.items():
if name.startswith('_') or not isinstance(obj, type):
continue
if name in _loaded_modules:
continue
if check_module_func(obj):
_loaded_modules.add(name)
plugins.append(obj)
logger.debug(f"找到符合条件的插件类:{name}")
break
except Exception as err:
logger.error(f"加载插件 {plugin_dir.name} 失败:{str(err)} - {traceback.format_exc()}")
return plugins
@property
def running_plugins(self) -> Dict[str, Any]:
"""
获取运行态插件列表
:return: 运行态插件列表
"""
return self._running_plugins
@property
def plugins(self) -> Dict[str, Any]:
"""
获取插件列表
:return: 插件列表
"""
return self._plugins
def on_config_changed(self):
self.reload_monitor()
def get_reload_name(self) -> str:
return "插件文件修改监测"
def reload_monitor(self):
"""
重新加载插件文件修改监测
"""
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
# 先关闭已有监测,再重新启动
self.stop_monitor()
self.__start_monitor()
else:
self.stop_monitor()
def __start_monitor(self):
"""
启用监测插件文件修改监测
"""
if self._monitor_thread and self._monitor_thread.is_alive():
logger.info("插件文件修改监测已经在运行中...")
return
logger.info("开始监测插件文件修改...")
# 在启动新线程之前,确保停止事件是清除状态
self._stop_monitor_event.clear()
# 创建并启动监控线程
self._monitor_thread = threading.Thread(
target=self._run_file_watcher,
daemon=True
)
self._monitor_thread.start()
def stop_monitor(self):
"""
停止监测插件文件修改监测
"""
if self._monitor_thread and self._monitor_thread.is_alive():
logger.info("正在停止插件文件修改监测...")
self._stop_monitor_event.set()
self._monitor_thread.join(timeout=5)
if self._monitor_thread.is_alive():
logger.warning("插件文件修改监测线程在5秒内未能正常停止。")
self._monitor_thread = None
logger.info("插件文件修改监测停止完成")
else:
logger.info("未启用插件文件修改监测,无需停止")
def _run_file_watcher(self):
"""
运行 watchfiles 监视器的主循环。
"""
# 监视插件目录
plugins_path = str(settings.ROOT_PATH / "app" / "plugins")
logger.info(">>> 监控线程已启动准备进入watch循环...")
# 使用 watchfiles 监视目录变化,并响应变化事件
# Todo: yield_on_timeout = True 时,每秒检查停止事件,会返回空集合;后续可以考虑用来做心跳之类的功能?
for changes in watch(plugins_path, stop_event=self._stop_monitor_event, rust_timeout=1000,
yield_on_timeout=True):
# 如果收到停止事件,退出循环
if not changes:
continue
# 处理变化事件
plugins_to_reload = set()
for _change_type, path_str in changes:
event_path = Path(path_str)
# 跳过非 .py 文件以及 pycache 目录中的文件
if not event_path.name.endswith(".py") or "__pycache__" in event_path.parts:
continue
# 解析插件ID
pid = self._get_plugin_id_from_path(event_path)
# 跳过无效插件文件
if pid:
# 收集需要重载的插件ID自动去重避免重复重载
plugins_to_reload.add(pid)
# 触发重载
if plugins_to_reload:
logger.info(f"检测到插件文件变化,准备重载: {list(plugins_to_reload)}")
for pid in plugins_to_reload:
try:
self.reload_plugin(pid)
except Exception as e:
logger.error(f"插件 {pid} 热重载失败: {e}", exc_info=True)
@staticmethod
def _get_plugin_id_from_path(event_path: Path) -> Optional[str]:
"""
根据文件路径解析出插件的ID。
:param event_path: 被修改文件的 Path 对象。
:return: 插件ID字符串如果不是有效插件文件则返回 None。
"""
try:
plugins_root = settings.ROOT_PATH / "app" / "plugins"
# 确保修改的文件在 plugins 目录下
if not event_path.is_relative_to(plugins_root):
return None
try:
plugin_dir_name = event_path.relative_to(plugins_root).parts[0]
plugin_dir = plugins_root / plugin_dir_name
except (ValueError, IndexError):
return None
init_file = plugin_dir / "__init__.py"
if not init_file.exists():
return None
# 读取 __init__.py 文件,查找插件主类名
with open(init_file, "r", encoding="utf-8") as f:
source_code = f.read()
tree = ast.parse(source_code)
# 遍历AST查找继承自 _PluginBase 的类
for node in ast.walk(tree):
# 检查节点是否为类定义
if isinstance(node, ast.ClassDef):
# 遍历该类的所有基类
for base in node.bases:
# 检查基类是否是我们寻找的 _PluginBase
# ast.Name 用于处理简单的基类名
if isinstance(base, ast.Name) and base.id == '_PluginBase':
# 返回这个类的名字
return node.name
return None
except Exception as e:
logger.error(f"从路径解析插件ID时出错: {e}")
return None
@staticmethod
def __stop_plugin(plugin: Any):
"""
停止插件
:param plugin: 插件实例
"""
try:
# 关闭数据库
if hasattr(plugin, "close"):
plugin.close()
# 关闭插件
if hasattr(plugin, "stop_service"):
plugin.stop_service()
except Exception as e:
logger.warn(f"停止插件 {plugin.get_name()} 时发生错误: {str(e)}")
def remove_plugin(self, plugin_id: str):
"""
从内存中移除一个插件
:param plugin_id: 插件ID
"""
self.stop(plugin_id)
def reload_plugin(self, plugin_id: str):
"""
将一个插件重新加载到内存
:param plugin_id: 插件ID
"""
# 先移除插件实例
self.stop(plugin_id)
# 重新加载
self.start(plugin_id)
# 广播事件
eventmanager.send_event(EventType.PluginReload, data={"plugin_id": plugin_id})
@staticmethod
def _clear_plugin_modules(plugin_id: Optional[str] = None):
"""
清除插件及其所有子模块的缓存
:param plugin_id: 插件ID
"""
# 构建插件模块前缀
if plugin_id:
plugin_module_prefix = f"app.plugins.{plugin_id.lower()}"
else:
plugin_module_prefix = "app.plugins"
# 收集需要删除的模块名(创建模块名列表的副本以避免迭代时修改字典)
modules_to_remove = []
for module_name in list(sys.modules.keys()):
if module_name == plugin_module_prefix or module_name.startswith(plugin_module_prefix + "."):
modules_to_remove.append(module_name)
# 删除模块
for module_name in modules_to_remove:
try:
del sys.modules[module_name]
logger.debug(f"已清除插件模块缓存:{module_name}")
except KeyError:
# 模块可能已经被删除
pass
importlib.invalidate_caches()
logger.debug("已清除查找器的缓存")
if plugin_id:
if modules_to_remove:
logger.info(f"插件 {plugin_id} 共清除 {len(modules_to_remove)} 个模块缓存:{modules_to_remove}")
else:
logger.debug(f"插件 {plugin_id} 没有找到需要清除的模块缓存")
def sync(self) -> List[str]:
"""
安装本地不存在或需要更新的插件
"""
def install_plugin(plugin):
start_time = time.time()
state, msg = PluginHelper().install(pid=plugin.id, repo_url=plugin.repo_url, force_install=True)
elapsed_time = time.time() - start_time
if state:
logger.info(
f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version},耗时:{elapsed_time:.2f}")
sync_plugins.append(plugin.id)
else:
logger.error(
f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg},耗时:{elapsed_time:.2f}")
failed_plugins.append(plugin.id)
if SystemUtils.is_frozen():
return []
# 获取已安装插件列表
install_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件列表
online_plugins = self.get_online_plugins()
# 确定需要安装的插件
plugins_to_install = [
plugin for plugin in online_plugins
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id, plugin.plugin_version)
]
if not plugins_to_install:
return []
logger.info("开始安装第三方插件...")
sync_plugins = []
failed_plugins = []
# 使用 ThreadPoolExecutor 进行并发安装
total_start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(install_plugin, plugin): plugin
for plugin in plugins_to_install
}
for future in as_completed(futures):
plugin = futures[future]
try:
future.result()
except Exception as exc:
logger.error(f"插件 {plugin.plugin_name} 安装过程中出现异常: {exc}")
total_elapsed_time = time.time() - total_start_time
logger.info(
f"第三方插件安装完成,成功:{len(sync_plugins)} 个,"
f"失败:{len(failed_plugins)} 个,总耗时:{total_elapsed_time:.2f}"
)
return sync_plugins
@staticmethod
def install_plugin_missing_dependencies() -> List[str]:
"""
安装插件中缺失或不兼容的依赖项
"""
pluginhelper = PluginHelper()
# 第一步:获取需要安装的依赖项列表
missing_dependencies = pluginhelper.find_missing_dependencies()
if not missing_dependencies:
return missing_dependencies
logger.debug(f"检测到缺失的依赖项: {missing_dependencies}")
logger.info(f"开始安装缺失的依赖项,共 {len(missing_dependencies)} 个...")
# 第二步:安装依赖项并返回结果
total_start_time = time.time()
success, message = pluginhelper.install_dependencies(missing_dependencies)
total_elapsed_time = time.time() - total_start_time
if success:
logger.info(f"已完成 {len(missing_dependencies)} 个依赖项安装,总耗时:{total_elapsed_time:.2f}")
else:
logger.warning(f"存在缺失依赖项安装失败,请尝试手动安装,总耗时:{total_elapsed_time:.2f}")
return missing_dependencies
def get_plugin_config(self, pid: str) -> dict:
"""
获取插件配置
:param pid: 插件ID
"""
if not self._plugins.get(pid):
return {}
conf = SystemConfigOper().get(self._config_key % pid)
if conf:
# 去掉空Key
return {k: v for k, v in conf.items() if k}
return {}
def save_plugin_config(self, pid: str, conf: dict, force: bool = False) -> bool:
"""
保存插件配置
:param pid: 插件ID
:param conf: 配置
:param force: 强制保存
"""
if not force and not self._plugins.get(pid):
return False
SystemConfigOper().set(self._config_key % pid, conf)
return True
def delete_plugin_config(self, pid: str) -> bool:
"""
删除插件配置
:param pid: 插件ID
"""
if not self._plugins.get(pid):
return False
return SystemConfigOper().delete(self._config_key % pid)
def delete_plugin_data(self, pid: str) -> bool:
"""
删除插件数据
:param pid: 插件ID
"""
if not self._plugins.get(pid):
return False
PluginDataOper().del_data(pid)
return True
def get_plugin_state(self, pid: str) -> bool:
"""
获取插件状态
:param pid: 插件ID
"""
plugin = self._running_plugins.get(pid)
return plugin.get_state() if plugin else False
def get_plugin_commands(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件命令
[{
"cmd": "/xx",
"event": EventType.xx,
"desc": "xxxx",
"data": {},
"pid": "",
}]
"""
ret_commands = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_command") and ObjectUtils.check_method(plugin.get_command):
try:
if not plugin.get_state():
continue
commands = plugin.get_command() or []
for command in commands:
command["pid"] = plugin_id
ret_commands.extend(commands)
except Exception as e:
logger.error(f"获取插件命令出错:{str(e)}")
return ret_commands
def get_plugin_apis(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件API
[{
"path": "/xx",
"endpoint": self.xxx,
"methods": ["GET", "POST"],
"summary": "API名称",
"description": "API说明",
"allow_anonymous": false
}]
"""
ret_apis = []
if pid:
plugins = {pid: self._running_plugins.get(pid)}
else:
plugins = self._running_plugins
for plugin_id, plugin in plugins.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_api") and ObjectUtils.check_method(plugin.get_api):
try:
apis = plugin.get_api() or []
for api in apis:
api["path"] = f"/{plugin_id}{api['path']}"
if not api.get("auth"):
api["auth"] = "apikey"
ret_apis.extend(apis)
except Exception as e:
logger.error(f"获取插件 {plugin_id} API出错{str(e)}")
return ret_apis
def get_plugin_services(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件服务
[{
"id": "服务ID",
"name": "服务名称",
"trigger": "触发器cron、interval、date、CronTrigger.from_crontab()",
"func": self.xxx,
"kwargs": {} # 定时器参数,
"func_kwargs": {} # 方法参数
}]
"""
ret_services = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_service") and ObjectUtils.check_method(plugin.get_service):
try:
if not plugin.get_state():
continue
services = plugin.get_service() or []
ret_services.extend(services)
except Exception as e:
logger.error(f"获取插件 {plugin_id} 服务出错:{str(e)}")
return ret_services
def get_plugin_modules(self, pid: Optional[str] = None) -> Dict[tuple, Dict[str, Any]]:
"""
获取插件模块
{
plugin_id: {
method: function
}
}
"""
ret_modules = {}
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_module") and ObjectUtils.check_method(plugin.get_module):
try:
if not plugin.get_state():
continue
plugin_module = plugin.get_module() or []
ret_modules[(plugin_id, plugin.get_name())] = plugin_module
except Exception as e:
logger.error(f"获取插件 {plugin_id} 模块出错:{str(e)}")
return ret_modules
def get_plugin_actions(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件动作
[{
"id": "动作ID",
"name": "动作名称",
"func": self.xxx,
"kwargs": {} # 需要附加传递的参数
}]
"""
ret_actions = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_actions") and ObjectUtils.check_method(plugin.get_actions):
try:
if not plugin.get_state():
continue
actions = plugin.get_actions()
if actions:
ret_actions.append({
"plugin_id": plugin_id,
"plugin_name": plugin.plugin_name,
"actions": actions
})
except Exception as e:
logger.error(f"获取插件 {plugin_id} 动作出错:{str(e)}")
return ret_actions
def get_plugin_agent_tools(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件智能体工具
[{
"plugin_id": "插件ID",
"plugin_name": "插件名称",
"tools": [ToolClass1, ToolClass2, ...]
}]
"""
ret_tools = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_agent_tools") and ObjectUtils.check_method(plugin.get_agent_tools):
try:
if not plugin.get_state():
continue
tools = plugin.get_agent_tools()
if tools:
ret_tools.append({
"plugin_id": plugin_id,
"plugin_name": plugin.plugin_name,
"tools": tools
})
except Exception as e:
logger.error(f"获取插件 {plugin_id} 智能体工具出错:{str(e)}")
return ret_tools
@staticmethod
def get_plugin_remote_entry(plugin_id: str, dist_path: str) -> str:
"""
获取插件的远程入口地址
:param plugin_id: 插件 ID
:param dist_path: 插件的分发路径
:return: 远程入口地址
"""
if dist_path.startswith("/"):
dist_path = dist_path[1:]
if dist_path.endswith("/"):
dist_path = dist_path[:-1]
return f"/plugin/file/{plugin_id.lower()}/{dist_path}/remoteEntry.js"
def get_plugin_remotes(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件联邦组件列表
"""
remotes = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_render_mode"):
render_mode, dist_path = plugin.get_render_mode()
if render_mode != "vue":
continue
remotes.append({
"id": plugin_id,
"url": self.get_plugin_remote_entry(plugin_id, dist_path),
"name": plugin.plugin_name,
})
return remotes
def get_plugin_dashboard_meta(self) -> List[Dict[str, str]]:
"""
获取所有插件仪表盘元信息
"""
dashboard_meta = []
# 创建字典快照避免并发修改
running_plugins_snapshot = dict(self._running_plugins)
for plugin_id, plugin in running_plugins_snapshot.items():
if not hasattr(plugin, "get_dashboard") or not ObjectUtils.check_method(plugin.get_dashboard):
continue
try:
if not plugin.get_state():
continue
# 如果是多仪表盘实现
if hasattr(plugin, "get_dashboard_meta") and ObjectUtils.check_method(plugin.get_dashboard_meta):
meta = plugin.get_dashboard_meta()
if meta:
dashboard_meta.extend([{
"id": plugin_id,
"name": m.get("name"),
"key": m.get("key"),
} for m in meta if m])
else:
dashboard_meta.append({
"id": plugin_id,
"name": plugin.plugin_name,
"key": "",
})
except Exception as e:
logger.error(f"获取插件[{plugin_id}]仪表盘元数据出错:{str(e)}")
return dashboard_meta
def get_plugin_dashboard(self, pid: str, key: str, user_agent: str = None) -> schemas.PluginDashboard:
"""
获取插件仪表盘
"""
def __get_params_count(func: Callable):
"""
获取函数的参数信息
"""
signature = inspect.signature(func)
return len(signature.parameters)
# 获取插件实例
plugin_instance = self.running_plugins.get(pid)
if not plugin_instance:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"插件 {pid} 不存在或未加载")
# 渲染模式
render_mode, _ = plugin_instance.get_render_mode()
# 获取插件仪表板
try:
# 检查方法的参数个数
params_count = __get_params_count(plugin_instance.get_dashboard)
if params_count > 1:
dashboard: Tuple = plugin_instance.get_dashboard(key=key, user_agent=user_agent)
elif params_count > 0:
dashboard: Tuple = plugin_instance.get_dashboard(user_agent=user_agent)
else:
dashboard: Tuple = plugin_instance.get_dashboard()
except Exception as e:
logger.error(f"插件 {pid} 调用方法 get_dashboard 出错: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"插件 {pid} 调用方法 get_dashboard 出错: {str(e)}")
cols, attrs, elements = dashboard
return schemas.PluginDashboard(
id=pid,
name=plugin_instance.plugin_name,
key=key,
render_mode=render_mode,
cols=cols or {},
attrs=attrs or {},
elements=elements
)
def get_plugin_attr(self, pid: str, attr: str) -> Any:
"""
获取插件属性
:param pid: 插件ID
:param attr: 属性名
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return None
if not hasattr(plugin, attr):
return None
return getattr(plugin, attr)
def run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any:
"""
运行插件方法
:param pid: 插件ID
:param method: 方法名
:param args: 参数
:param kwargs: 关键字参数
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return None
if not hasattr(plugin, method):
return None
return getattr(plugin, method)(*args, **kwargs)
async def async_run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any:
"""
异步运行插件方法
:param pid: 插件ID
:param method: 方法名
:param args: 参数
:param kwargs: 关键字参数
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return None
if not hasattr(plugin, method):
return None
method_func = getattr(plugin, method)
if asyncio.iscoroutinefunction(method_func):
return await method_func(*args, **kwargs)
else:
return method_func(*args, **kwargs)
def get_plugin_ids(self) -> List[str]:
"""
获取所有插件ID
"""
return list(self._plugins.keys())
def get_running_plugin_ids(self) -> List[str]:
"""
获取所有运行态插件ID
"""
return list(self._running_plugins.keys())
def get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
获取所有在线插件信息
"""
if not settings.PLUGIN_MARKET:
return []
# 用于存储高于 v1 版本的插件(如 v2, v3 等)
higher_version_plugins = []
# 用于存储 v1 版本插件
base_version_plugins = []
# 使用多线程获取线上插件
with concurrent.futures.ThreadPoolExecutor() as executor:
futures_to_version = {}
for m in settings.PLUGIN_MARKET.split(","):
if not m:
continue
# 提交任务获取 v1 版本插件,存储 future 到 version 的映射
base_future = executor.submit(self.get_plugins_from_market, m, None, force)
futures_to_version[base_future] = "base_version"
# 提交任务获取高版本插件(如 v2、v3存储 future 到 version 的映射
if settings.VERSION_FLAG:
higher_version_future = executor.submit(self.get_plugins_from_market, m,
settings.VERSION_FLAG, force)
futures_to_version[higher_version_future] = "higher_version"
# 按照完成顺序处理结果
for future in concurrent.futures.as_completed(futures_to_version):
plugins = future.result()
version = futures_to_version[future]
if plugins:
if version == "higher_version":
higher_version_plugins.extend(plugins) # 收集高版本插件
else:
base_version_plugins.extend(plugins) # 收集 v1 版本插件
return self._process_plugins_list(higher_version_plugins, base_version_plugins)
def get_local_plugins(self) -> List[schemas.Plugin]:
"""
获取所有本地已下载的插件信息
"""
# 返回值
plugins = []
# 已安装插件
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
for pid, plugin_class in self._plugins.items():
# 运行状插件
plugin_obj = self._running_plugins.get(pid)
# 基本属性
plugin = schemas.Plugin()
# ID
plugin.id = pid
# 安装状态
if pid in installed_apps:
plugin.installed = True
else:
plugin.installed = False
# 运行状态
if plugin_obj and hasattr(plugin_obj, "get_state"):
try:
state = plugin_obj.get_state()
except Exception as e:
logger.error(f"获取插件 {pid} 状态出错:{str(e)}")
state = False
plugin.state = state
else:
plugin.state = False
# 是否有详情页面
if hasattr(plugin_class, "get_page"):
if ObjectUtils.check_method(plugin_class.get_page):
plugin.has_page = True
else:
plugin.has_page = False
# 公钥
if hasattr(plugin_class, "plugin_public_key"):
plugin.plugin_public_key = plugin_class.plugin_public_key
# 权限
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_class):
continue
# 名称
if hasattr(plugin_class, "plugin_name"):
plugin.plugin_name = plugin_class.plugin_name
# 描述
if hasattr(plugin_class, "plugin_desc"):
plugin.plugin_desc = plugin_class.plugin_desc
# 版本
if hasattr(plugin_class, "plugin_version"):
plugin.plugin_version = plugin_class.plugin_version
# 图标
if hasattr(plugin_class, "plugin_icon"):
plugin.plugin_icon = plugin_class.plugin_icon
# 作者
if hasattr(plugin_class, "plugin_author"):
plugin.plugin_author = plugin_class.plugin_author
# 作者链接
if hasattr(plugin_class, "author_url"):
plugin.author_url = plugin_class.author_url
# 加载顺序
if hasattr(plugin_class, "plugin_order"):
plugin.plugin_order = plugin_class.plugin_order
# 是否需要更新
plugin.has_update = False
# 本地标志
plugin.is_local = True
# 汇总
plugins.append(plugin)
# 根据加载排序重新排序
plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0)
return plugins
@staticmethod
def is_plugin_exists(pid: str, version: str = None) -> bool:
"""
判断插件是否存在,并满足版本要求(有传入version时)
:param pid: 插件ID
:param version: 插件版本
"""
if not pid:
return False
try:
# 构建包名
package_name = f"app.plugins.{pid.lower()}"
# 检查包是否存在
spec = importlib.util.find_spec(package_name)
package_exists = spec is not None and spec.origin is not None
logger.debug(f"{pid} exists: {package_exists}")
if not package_exists:
return False
local_version = PluginManager().get_plugin_attr(pid=pid, attr="plugin_version")
if not local_version:
return False
if version and not StringUtils.compare_version(local_version, ">=", version):
logger.warn(f"Plugin {pid} version: {local_version} (older than version: {version})")
return False
return True
except Exception as e:
logger.debug(f"获取插件是否在本地包中存在失败,{e}")
return False
def get_plugins_from_market(self, market: str,
package_version: Optional[str] = None,
force: bool = False) -> Optional[List[schemas.Plugin]]:
"""
从指定的市场获取插件信息
:param market: 市场的 URL 或标识
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新(忽略缓存)
:return: 返回插件的列表,若获取失败返回 []
"""
if not market:
return []
# 已安装插件
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件
with fresh(force):
online_plugins = PluginHelper().get_plugins(market, package_version)
if online_plugins is None:
logger.warning(
f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
return []
ret_plugins = []
add_time = len(online_plugins)
for pid, plugin_info in online_plugins.items():
plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version)
if plugin:
ret_plugins.append(plugin)
add_time -= 1
return ret_plugins
@staticmethod
def _process_plugins_list(higher_version_plugins: List[schemas.Plugin],
base_version_plugins: List[schemas.Plugin]) -> List[schemas.Plugin]:
"""
处理插件列表:合并、去重、排序、保留最高版本
:param higher_version_plugins: 高版本插件列表
:param base_version_plugins: 基础版本插件列表
:return: 处理后的插件列表
"""
# 优先处理高版本插件
all_plugins = []
all_plugins.extend(higher_version_plugins)
# 将未出现在高版本插件列表中的 v1 插件加入 all_plugins
higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins}
all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids])
# 去重
all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values())
# 所有插件按 repo 在设置中的顺序排序
all_plugins.sort(
key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0
)
# 相同 ID 的插件保留版本号最大的版本
max_versions = {}
for p in all_plugins:
if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]):
max_versions[p.id] = p.plugin_version
result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]]
logger.info(f"共获取到 {len(result)} 个线上插件")
return result
def _process_plugin_info(self, pid: str, plugin_info: dict, market: str,
installed_apps: List[str], add_time: int,
package_version: Optional[str] = None) -> Optional[schemas.Plugin]:
"""
处理单个插件信息,创建 schemas.Plugin 对象
:param pid: 插件ID
:param plugin_info: 插件信息字典
:param market: 市场URL
:param installed_apps: 已安装插件列表
:param add_time: 添加顺序
:param package_version: 包版本
:return: 创建的插件对象如果验证失败返回None
"""
if not isinstance(plugin_info, dict):
return None
# 如 package_version 为空,则需要判断插件是否兼容当前版本
if not package_version:
if plugin_info.get(settings.VERSION_FLAG) is not True:
# 插件当前版本不兼容
return None
# 运行状插件
plugin_obj = self._running_plugins.get(pid)
# 非运行态插件
plugin_static = self._plugins.get(pid)
# 基本属性
plugin = schemas.Plugin()
# ID
plugin.id = pid
# 安装状态
if pid in installed_apps and plugin_static:
plugin.installed = True
else:
plugin.installed = False
# 是否有新版本
plugin.has_update = False
if plugin_static:
installed_version = getattr(plugin_static, "plugin_version")
if StringUtils.compare_version(installed_version, "<", plugin_info.get("version")):
# 需要更新
plugin.has_update = True
# 运行状态
if plugin_obj and hasattr(plugin_obj, "get_state"):
try:
state = plugin_obj.get_state()
except Exception as e:
logger.error(f"获取插件 {pid} 状态出错:{str(e)}")
state = False
plugin.state = state
else:
plugin.state = False
# 是否有详情页面
plugin.has_page = False
if plugin_obj and hasattr(plugin_obj, "get_page"):
if ObjectUtils.check_method(plugin_obj.get_page):
plugin.has_page = True
# 公钥
if plugin_info.get("key"):
plugin.plugin_public_key = plugin_info.get("key")
# 权限
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info):
return None
# 名称
if plugin_info.get("name"):
plugin.plugin_name = plugin_info.get("name")
# 描述
if plugin_info.get("description"):
plugin.plugin_desc = plugin_info.get("description")
# 版本
if plugin_info.get("version"):
plugin.plugin_version = plugin_info.get("version")
# 图标
if plugin_info.get("icon"):
plugin.plugin_icon = plugin_info.get("icon")
# 标签
if plugin_info.get("labels"):
plugin.plugin_label = plugin_info.get("labels")
# 作者
if plugin_info.get("author"):
plugin.plugin_author = plugin_info.get("author")
# 更新历史
if plugin_info.get("history"):
plugin.history = plugin_info.get("history")
# 仓库链接
plugin.repo_url = market
# 本地标志
plugin.is_local = False
# 添加顺序
plugin.add_time = add_time
return plugin
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
异步获取所有在线插件信息
:param force: 是否强制刷新(忽略缓存)
"""
if not settings.PLUGIN_MARKET:
return []
# 用于存储高于 v1 版本的插件(如 v2, v3 等)
higher_version_plugins = []
# 用于存储 v1 版本插件
base_version_plugins = []
# 使用异步并发获取线上插件
import asyncio
tasks = []
task_to_version = {}
for m in settings.PLUGIN_MARKET.split(","):
if not m:
continue
# 创建任务获取 v1 版本插件
base_task = asyncio.create_task(self.async_get_plugins_from_market(m, None, force))
tasks.append(base_task)
task_to_version[base_task] = "base_version"
# 创建任务获取高版本插件(如 v2、v3
if settings.VERSION_FLAG:
higher_version_task = asyncio.create_task(
self.async_get_plugins_from_market(m, settings.VERSION_FLAG, force))
tasks.append(higher_version_task)
task_to_version[higher_version_task] = "higher_version"
# 并发执行所有任务
if tasks:
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
task = tasks[i]
version = task_to_version[task]
# 检查是否有异常
if isinstance(result, Exception):
logger.error(f"获取插件市场数据失败:{str(result)}")
continue
plugins = result
if plugins:
if version == "higher_version":
higher_version_plugins.extend(plugins) # 收集高版本插件
else:
base_version_plugins.extend(plugins) # 收集 v1 版本插件
return self._process_plugins_list(higher_version_plugins, base_version_plugins)
async def async_get_plugins_from_market(self, market: str,
package_version: Optional[str] = None,
force: bool = False) -> Optional[List[schemas.Plugin]]:
"""
异步从指定的市场获取插件信息
:param market: 市场的 URL 或标识
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新(忽略缓存)
:return: 返回插件的列表,若获取失败返回 []
"""
if not market:
return []
# 已安装插件
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件
async with async_fresh(force):
online_plugins = await PluginHelper().async_get_plugins(market, package_version)
if online_plugins is None:
logger.warning(
f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
return []
ret_plugins = []
add_time = len(online_plugins)
for pid, plugin_info in online_plugins.items():
plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version)
if plugin:
ret_plugins.append(plugin)
add_time -= 1
return ret_plugins
@staticmethod
def __set_and_check_auth_level(plugin: Union[schemas.Plugin, Type[Any]],
source: Optional[Union[dict, Type[Any]]] = None) -> bool:
"""
设置并检查插件的认证级别
:param plugin: 插件对象或包含 auth_level 属性的对象
:param source: 可选的字典对象或类对象,可能包含 "level""auth_level"
:return: 如果插件的认证级别有效且当前环境的认证级别满足要求,返回 True否则返回 False
"""
# 检查并赋值 source 中的 level 或 auth_level
if source:
if isinstance(source, dict) and "level" in source:
plugin.auth_level = source.get("level")
elif hasattr(source, "auth_level"):
plugin.auth_level = source.auth_level
# 如果 source 为空且 plugin 本身没有 auth_level直接返回 True
elif not hasattr(plugin, "auth_level"):
return True
# auth_level 级别说明
# 1 - 所有用户可见
# 2 - 站点认证用户可见
# 3 - 站点&密钥认证可见
# 99 - 站点&特殊密钥认证可见
# 如果当前站点认证级别大于 1 且插件级别为 99并存在插件公钥说明为特殊密钥认证通过密钥匹配进行认证
siteshelper = SitesHelper()
if siteshelper.auth_level > 1 and plugin.auth_level == 99 and hasattr(plugin, "plugin_public_key"):
plugin_id = plugin.id if isinstance(plugin, schemas.Plugin) else plugin.__name__
public_key = plugin.plugin_public_key
if public_key:
private_key = PluginManager.__get_plugin_private_key(plugin_id)
verify = RSAUtils.verify_rsa_keys(public_key=public_key, private_key=private_key)
return verify
# 如果当前站点认证级别小于插件级别,则返回 False
if siteshelper.auth_level < plugin.auth_level:
return False
return True
@staticmethod
def __get_plugin_private_key(plugin_id: str) -> Optional[str]:
"""
根据插件标识获取对应的私钥
:param plugin_id: 插件标识
:return: 对应的插件私钥,如果未找到则返回 None
"""
try:
# 将插件标识转换为大写并构建环境变量名称
env_var_name = f"PLUGIN_{plugin_id.upper()}_PRIVATE_KEY"
private_key = os.environ.get(env_var_name)
return private_key
except Exception as e:
logger.debug(f"获取插件 {plugin_id} 的私钥时发生错误:{e}")
return None
def clone_plugin(self, plugin_id: str, suffix: str, name: str, description: str,
version: str = None, icon: str = None) -> Tuple[bool, str]:
"""
创建插件分身
:param plugin_id: 原插件ID
:param suffix: 分身后缀
:param name: 分身名称
:param description: 分身描述
:param version: 自定义版本号
:param icon: 自定义图标URL
:return: (是否成功, 错误信息)
"""
try:
# 验证参数
if not plugin_id or not suffix:
return False, "插件ID和分身后缀不能为空"
# 检查原插件是否存在
if plugin_id not in self._plugins:
return False, f"原插件 {plugin_id} 不存在"
# 生成分身插件ID
clone_id = f"{plugin_id}{suffix.lower()}"
# 检查分身插件是否已存在
if self.is_plugin_exists(clone_id):
return False, f"分身插件 {clone_id} 已存在"
# 获取原插件目录
original_plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / plugin_id.lower()
if not original_plugin_dir.exists():
return False, f"原插件目录 {original_plugin_dir} 不存在"
# 创建分身插件目录
clone_plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / clone_id.lower()
# 复制插件目录
import shutil
shutil.copytree(original_plugin_dir, clone_plugin_dir)
logger.info(f"已复制插件目录:{original_plugin_dir} -> {clone_plugin_dir}")
# 修改插件文件内容
success, msg = self._modify_plugin_files(
plugin_dir=clone_plugin_dir,
original_id=plugin_id,
suffix=suffix.lower(),
name=name,
description=description,
version=version,
icon=icon
)
if not success:
# 如果修改失败,清理已创建的目录
if clone_plugin_dir.exists():
shutil.rmtree(clone_plugin_dir)
return False, msg
# 将分身插件添加到已安装列表
systemconfig = SystemConfigOper()
installed_plugins = systemconfig.get(SystemConfigKey.UserInstalledPlugins) or []
if clone_id not in installed_plugins:
installed_plugins.append(clone_id)
systemconfig.set(SystemConfigKey.UserInstalledPlugins, installed_plugins)
# 为分身插件创建初始配置(从原插件复制配置)
logger.info(f"正在为分身插件 {clone_id} 创建初始配置...")
original_config = self.get_plugin_config(plugin_id)
if original_config:
# 复制原插件配置作为分身插件的初始配置
clone_config = original_config.copy()
# 可以在这里修改一些默认值,比如禁用分身插件
# 默认禁用分身插件,让用户手动配置
clone_config['enable'] = False
clone_config['enabled'] = False
self.save_plugin_config(clone_id, clone_config, force=True)
logger.info(f"已为分身插件 {clone_id} 设置初始配置")
else:
logger.info(f"原插件 {plugin_id} 没有配置,分身插件 {clone_id} 将使用默认配置")
# 注册分身插件的API和服务
logger.info(f"正在注册分身插件 {clone_id} ...")
PluginManager().reload_plugin(clone_id)
# 确保分身插件正确初始化配置
if clone_id in self._running_plugins:
clone_instance = self._running_plugins[clone_id]
clone_config = self.get_plugin_config(clone_id)
if clone_config:
logger.info(f"正在为分身插件 {clone_id} 重新初始化配置...")
clone_instance.init_plugin(clone_config)
logger.info(f"分身插件 {clone_id} 配置重新初始化完成")
logger.info(f"插件分身 {clone_id} 创建成功")
return True, clone_id
except Exception as e:
logger.error(f"创建插件分身失败:{str(e)}")
return False, f"创建插件分身失败:{str(e)}"
def _modify_plugin_files(self, plugin_dir: Path, original_id: str, suffix: str,
name: str, description: str, version: str = None,
icon: str = None) -> Tuple[bool, str]:
"""
修改插件文件中的类名和相关信息
:param plugin_dir: 插件目录
:param original_id: 原插件ID
:param suffix: 分身后缀
:param name: 分身名称
:param description: 分身描述
:param version: 自定义版本号
:param icon: 自定义图标URL
:return: (是否成功, 错误信息)
"""
try:
# 获取原插件类
original_plugin_class = self._plugins.get(original_id)
if not original_plugin_class:
return False, f"无法获取原插件类 {original_id}"
# 获取原类名
original_class_name = original_plugin_class.__name__
clone_class_name = f"{original_class_name}{suffix}"
# 修改 __init__.py 文件
init_file = plugin_dir / "__init__.py"
if init_file.exists():
success, msg = self._modify_python_file(
file_path=init_file,
original_class_name=original_class_name,
clone_class_name=clone_class_name,
name=name,
description=description,
version=version,
icon=icon
)
if not success:
return False, msg
# 检查是否为联邦插件存在dist目录
dist_dir = plugin_dir / "dist"
if dist_dir.exists():
success, msg = self._modify_federation_files(
dist_dir=dist_dir,
original_class_name=original_class_name,
clone_class_name=clone_class_name
)
if not success:
return False, msg
return True, "文件修改成功"
except Exception as e:
logger.error(f"修改插件文件失败:{str(e)}")
return False, f"修改插件文件失败:{str(e)}"
@staticmethod
def _modify_python_file(file_path: Path, original_class_name: str,
clone_class_name: str, name: str, description: str,
version: str = None, icon: str = None) -> Tuple[bool, str]:
"""
修改Python文件中的类名和插件信息
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 替换类名
content = content.replace(f"class {original_class_name}", f"class {clone_class_name}")
# 替换插件名称和描述
import re
# 替换 plugin_name
if name:
content = re.sub(
r'plugin_name\s*=\s*["\'][^"\']*["\']',
f'plugin_name = "{name}"',
content
)
# 替换 plugin_desc
if description:
content = re.sub(
r'plugin_desc\s*=\s*["\'][^"\']*["\']',
f'plugin_desc = "{description}"',
content
)
# 替换 plugin_config_prefix如果存在
content = re.sub(
r'plugin_config_prefix\s*=\s*["\'][^"\']*["\']',
f'plugin_config_prefix = "{clone_class_name.lower()}_"',
content
)
# 替换 plugin_version如果提供了自定义版本
if version:
content = re.sub(
r'plugin_version\s*=\s*["\'][^"\']*["\']',
f'plugin_version = "{version}"',
content
)
# 替换 plugin_icon如果提供了自定义图标
if icon and icon.strip():
old_content = content
content = re.sub(
r'plugin_icon\s*=\s*["\'][^"\']*["\']',
f'plugin_icon = "{icon}"',
content
)
if old_content != content:
logger.info(f"已替换插件图标为: {icon}")
else:
logger.warning(f"插件图标替换失败,未找到匹配的图标设置")
else:
logger.info("未提供自定义图标,保持原插件图标")
# 添加分身标志
if "def init_plugin(self" in content:
init_index = content.index("def init_plugin(self")
# 在 def init_plugin(self 前添加 is_clone = True
content = content[:init_index] + "is_clone = True\n\n " + content[init_index:]
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.debug(f"已修改Python文件{file_path}")
return True, "Python文件修改成功"
except Exception as e:
logger.error(f"修改Python文件失败{str(e)}")
return False, f"修改Python文件失败{str(e)}"
def _modify_federation_files(self, dist_dir: Path, original_class_name: str,
clone_class_name: str) -> Tuple[bool, str]:
"""
修改联邦插件的前端文件
"""
try:
# 获取原始插件名(从类名推导)
original_plugin_name = original_class_name
clone_plugin_name = clone_class_name
# 遍历dist目录下的所有文件
for file_path in dist_dir.rglob("*"):
if not file_path.is_file():
continue
# 处理JS文件
if file_path.suffix == '.js':
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 替换类名引用(精确匹配)
content = content.replace(original_class_name, clone_class_name)
# 替换插件名引用(如果存在)
content = content.replace(f'"{original_plugin_name}"', f'"{clone_plugin_name}"')
content = content.replace(f"'{original_plugin_name}'", f"'{clone_plugin_name}'")
# 替换CSS key中的类名联邦插件特有
content = content.replace(f'css__{original_class_name}__', f'css__{clone_class_name}__')
# 替换可能的小写类名引用
content = content.replace(original_class_name.lower(), clone_class_name.lower())
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.debug(f"已修改联邦插件JS文件{file_path}")
except Exception as e:
logger.warning(f"修改联邦插件文件 {file_path} 失败:{str(e)}")
continue
# 处理CSS文件
elif file_path.suffix == '.css':
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 替换CSS中可能的类名引用
content = content.replace(original_class_name.lower(),
clone_class_name.lower()).replace(original_class_name,
clone_class_name)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.debug(f"已修改联邦插件CSS文件{file_path}")
except Exception as e:
logger.warning(f"修改联邦插件CSS文件 {file_path} 失败:{str(e)}")
continue
# 重命名构建文件(如果需要)
self._rename_federation_assets(dist_dir, original_class_name, clone_class_name)
return True, "联邦插件文件修改完成"
except Exception as e:
logger.error(f"修改联邦插件文件失败:{str(e)}")
return False, f"修改联邦插件文件失败:{str(e)}"
@staticmethod
def _rename_federation_assets(dist_dir: Path, original_class_name: str, clone_class_name: str):
"""
重命名联邦插件的资源文件,避免文件名冲突
"""
try:
# 查找包含原类名的文件并重命名
for file_path in dist_dir.glob("*"):
if not file_path.is_file():
continue
file_name = file_path.name
# 如果文件名包含原类名,则重命名
if original_class_name.lower() in file_name.lower():
new_name = file_name.replace(
original_class_name.lower(),
clone_class_name.lower()
)
new_path = file_path.parent / new_name
# 避免重命名冲突
if not new_path.exists():
file_path.rename(new_path)
logger.debug(f"重命名联邦插件文件:{file_name} -> {new_name}")
except Exception as e:
# 重命名失败不影响整体流程
logger.warning(f"重命名联邦插件资源文件失败:{str(e)}")