feat:插件查询协程处理

This commit is contained in:
jxxghp
2025-07-31 16:58:54 +08:00
parent a0e4b4a56e
commit 1e61e60d73
4 changed files with 852 additions and 118 deletions

View File

@@ -2,9 +2,11 @@ import mimetypes
import shutil
from typing import Annotated, Any, List, Optional
import aiofiles
from aiopath import AsyncPath
from fastapi import APIRouter, Depends, Header, HTTPException
from starlette import status
from starlette.responses import FileResponse
from starlette.responses import StreamingResponse
from app import schemas
from app.command import Command
@@ -12,7 +14,7 @@ from app.core.config import settings
from app.core.plugin import PluginManager
from app.core.security import verify_apikey, verify_token
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async
from app.factory import app
from app.helper.plugin import PluginHelper
from app.log import logger
@@ -136,13 +138,14 @@ def register_plugin(plugin_id: str):
@router.get("/", summary="所有插件", response_model=List[schemas.Plugin])
def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser),
state: Optional[str] = "all", force: bool = False) -> List[schemas.Plugin]:
async def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser_async),
state: Optional[str] = "all", force: bool = False) -> List[schemas.Plugin]:
"""
查询所有插件清单包括本地插件和在线插件插件状态installed, market, all
"""
# 本地插件
local_plugins = PluginManager().get_local_plugins()
plugin_manager = PluginManager()
local_plugins = plugin_manager.get_local_plugins()
# 已安装插件
installed_plugins = [plugin for plugin in local_plugins if plugin.installed]
if state == "installed":
@@ -151,7 +154,7 @@ def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser),
# 未安装的本地插件
not_installed_plugins = [plugin for plugin in local_plugins if not plugin.installed]
# 在线插件
online_plugins = PluginManager().get_online_plugins(force)
online_plugins = await plugin_manager.async_get_online_plugins(force)
if not online_plugins:
# 没有获取在线插件
if state == "market":
@@ -192,11 +195,11 @@ def installed(_: schemas.TokenPayload = Depends(get_current_active_superuser)) -
@router.get("/statistic", summary="插件安装统计", response_model=dict)
def statistic(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def statistic(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
插件安装统计
"""
return PluginHelper().get_statistic()
return await PluginHelper().async_get_statistic()
@router.get("/reload/{plugin_id}", summary="重新加载插件", response_model=schemas.Response)
@@ -222,12 +225,13 @@ def install(plugin_id: str,
# 已安装插件
install_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 首先检查插件是否已经存在,并且是否强制安装,否则只进行安装统计
plugin_helper = PluginHelper()
if not force and plugin_id in PluginManager().get_plugin_ids():
PluginHelper().install_reg(pid=plugin_id)
plugin_helper.install_reg(pid=plugin_id)
else:
# 插件不存在或需要强制安装,下载安装并注册插件
if repo_url:
state, msg = PluginHelper().install(pid=plugin_id, repo_url=repo_url)
state, msg = plugin_helper.install(pid=plugin_id, repo_url=repo_url)
# 安装失败则直接响应
if not state:
return schemas.Response(success=False, message=msg)
@@ -260,7 +264,8 @@ def plugin_form(plugin_id: str,
"""
根据插件ID获取插件配置表单或Vue组件URL
"""
plugin_instance = PluginManager().running_plugins.get(plugin_id)
plugin_manager = PluginManager()
plugin_instance = plugin_manager.running_plugins.get(plugin_id)
if not plugin_instance:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"插件 {plugin_id} 不存在或未加载")
@@ -271,7 +276,7 @@ def plugin_form(plugin_id: str,
return {
"render_mode": render_mode,
"conf": conf,
"model": PluginManager().get_plugin_config(plugin_id) or model
"model": plugin_manager.get_plugin_config(plugin_id) or model
}
except Exception as e:
logger.error(f"插件 {plugin_id} 调用方法 get_form 出错: {str(e)}")
@@ -343,7 +348,7 @@ def reset_plugin(plugin_id: str,
@router.get("/file/{plugin_id}/{filepath:path}", summary="获取插件静态文件")
def plugin_static_file(plugin_id: str, filepath: str):
async def plugin_static_file(plugin_id: str, filepath: str):
"""
获取插件静态文件
"""
@@ -352,11 +357,11 @@ def plugin_static_file(plugin_id: str, filepath: str):
logger.warning(f"Static File API: Path traversal attempt detected: {plugin_id}/{filepath}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
plugin_base_dir = settings.ROOT_PATH / "app" / "plugins" / plugin_id.lower()
plugin_base_dir = AsyncPath(settings.ROOT_PATH) / "app" / "plugins" / plugin_id.lower()
plugin_file_path = plugin_base_dir / filepath
if not plugin_file_path.exists():
if not await plugin_file_path.exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{plugin_file_path} 不存在")
if not plugin_file_path.is_file():
if not await plugin_file_path.is_file():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"{plugin_file_path} 不是文件")
# 判断 MIME 类型
@@ -371,9 +376,20 @@ def plugin_static_file(plugin_id: str, filepath: str):
response_type = 'application/octet-stream'
try:
return FileResponse(plugin_file_path, media_type=response_type)
# 异步生成器函数,用于流式读取文件
async def file_generator():
async with aiofiles.open(plugin_file_path, mode='rb') as file:
# 8KB 块大小
while chunk := await file.read(8192):
yield chunk
return StreamingResponse(
file_generator(),
media_type=response_type,
headers={"Content-Disposition": f"inline; filename={plugin_file_path.name}"}
)
except Exception as e:
logger.error(f"Error creating/sending FileResponse for {plugin_file_path}: {e}", exc_info=True)
logger.error(f"Error creating/sending StreamingResponse for {plugin_file_path}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal Server Error")
@@ -432,7 +448,8 @@ def delete_plugin_folder(folder_name: str, _: schemas.TokenPayload = Depends(get
@router.put("/folders/{folder_name}/plugins", summary="更新文件夹中的插件", response_model=schemas.Response)
def update_folder_plugins(folder_name: str, plugin_ids: List[str], _: schemas.TokenPayload = Depends(get_current_active_superuser)) -> Any:
def update_folder_plugins(folder_name: str, plugin_ids: List[str],
_: schemas.TokenPayload = Depends(get_current_active_superuser)) -> Any:
"""
更新指定文件夹中的插件列表
"""

View File

@@ -10,7 +10,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple
from app.helper.sites import SitesHelper # noqa
from fastapi import HTTPException
from starlette import status
from watchdog.events import FileSystemEventHandler
@@ -22,6 +21,7 @@ from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
@@ -850,8 +850,6 @@ class PluginManager(metaclass=Singleton):
if not settings.PLUGIN_MARKET:
return []
# 返回值
all_plugins = []
# 用于存储高于 v1 版本的插件(如 v2, v3 等)
higher_version_plugins = []
# 用于存储 v1 版本插件
@@ -884,25 +882,7 @@ class PluginManager(metaclass=Singleton):
else:
base_version_plugins.extend(plugins) # 收集 v1 版本插件
# 优先处理高版本插件
all_plugins.extend(higher_version_plugins)
# 将未出现在高版本插件列表中的 v1 插件加入 all_plugins
higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins}
all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids])
# 去重
all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values())
# 所有插件按 repo 在设置中的顺序排序
all_plugins.sort(
key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0
)
# 相同 ID 的插件保留版本号最大的版本
max_versions = {}
for p in all_plugins:
if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]):
max_versions[p.id] = p.plugin_version
result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]]
logger.info(f"共获取到 {len(result)} 个线上插件")
return result
return self._process_plugins_list(higher_version_plugins, base_version_plugins)
def get_local_plugins(self) -> List[schemas.Plugin]:
"""
@@ -1032,83 +1012,215 @@ class PluginManager(metaclass=Singleton):
ret_plugins = []
add_time = len(online_plugins)
for pid, plugin_info in online_plugins.items():
if not isinstance(plugin_info, dict):
plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version)
if plugin:
ret_plugins.append(plugin)
add_time -= 1
return ret_plugins
@staticmethod
def _process_plugins_list(higher_version_plugins: List[schemas.Plugin],
base_version_plugins: List[schemas.Plugin]) -> List[schemas.Plugin]:
"""
处理插件列表:合并、去重、排序、保留最高版本
:param higher_version_plugins: 高版本插件列表
:param base_version_plugins: 基础版本插件列表
:return: 处理后的插件列表
"""
# 优先处理高版本插件
all_plugins = []
all_plugins.extend(higher_version_plugins)
# 将未出现在高版本插件列表中的 v1 插件加入 all_plugins
higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins}
all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids])
# 去重
all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values())
# 所有插件按 repo 在设置中的顺序排序
all_plugins.sort(
key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0
)
# 相同 ID 的插件保留版本号最大的版本
max_versions = {}
for p in all_plugins:
if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]):
max_versions[p.id] = p.plugin_version
result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]]
logger.info(f"共获取到 {len(result)} 个线上插件")
return result
def _process_plugin_info(self, pid: str, plugin_info: dict, market: str,
installed_apps: List[str], add_time: int,
package_version: Optional[str] = None) -> Optional[schemas.Plugin]:
"""
处理单个插件信息,创建 schemas.Plugin 对象
:param pid: 插件ID
:param plugin_info: 插件信息字典
:param market: 市场URL
:param installed_apps: 已安装插件列表
:param add_time: 添加顺序
:param package_version: 包版本
:return: 创建的插件对象如果验证失败返回None
"""
if not isinstance(plugin_info, dict):
return None
# 如 package_version 为空,则需要判断插件是否兼容当前版本
if not package_version:
if plugin_info.get(settings.VERSION_FLAG) is not True:
# 插件当前版本不兼容
return None
# 运行状插件
plugin_obj = self._running_plugins.get(pid)
# 非运行态插件
plugin_static = self._plugins.get(pid)
# 基本属性
plugin = schemas.Plugin()
# ID
plugin.id = pid
# 安装状态
if pid in installed_apps and plugin_static:
plugin.installed = True
else:
plugin.installed = False
# 是否有新版本
plugin.has_update = False
if plugin_static:
installed_version = getattr(plugin_static, "plugin_version")
if StringUtils.compare_version(installed_version, "<", plugin_info.get("version")):
# 需要更新
plugin.has_update = True
# 运行状态
if plugin_obj and hasattr(plugin_obj, "get_state"):
try:
state = plugin_obj.get_state()
except Exception as e:
logger.error(f"获取插件 {pid} 状态出错:{str(e)}")
state = False
plugin.state = state
else:
plugin.state = False
# 是否有详情页面
plugin.has_page = False
if plugin_obj and hasattr(plugin_obj, "get_page"):
if ObjectUtils.check_method(plugin_obj.get_page):
plugin.has_page = True
# 公钥
if plugin_info.get("key"):
plugin.plugin_public_key = plugin_info.get("key")
# 权限
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info):
return None
# 名称
if plugin_info.get("name"):
plugin.plugin_name = plugin_info.get("name")
# 描述
if plugin_info.get("description"):
plugin.plugin_desc = plugin_info.get("description")
# 版本
if plugin_info.get("version"):
plugin.plugin_version = plugin_info.get("version")
# 图标
if plugin_info.get("icon"):
plugin.plugin_icon = plugin_info.get("icon")
# 标签
if plugin_info.get("labels"):
plugin.plugin_label = plugin_info.get("labels")
# 作者
if plugin_info.get("author"):
plugin.plugin_author = plugin_info.get("author")
# 更新历史
if plugin_info.get("history"):
plugin.history = plugin_info.get("history")
# 仓库链接
plugin.repo_url = market
# 本地标志
plugin.is_local = False
# 添加顺序
plugin.add_time = add_time
return plugin
async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]:
"""
异步获取所有在线插件信息
"""
if not settings.PLUGIN_MARKET:
return []
# 用于存储高于 v1 版本的插件(如 v2, v3 等)
higher_version_plugins = []
# 用于存储 v1 版本插件
base_version_plugins = []
# 使用异步并发获取线上插件
import asyncio
tasks = []
task_to_version = {}
for m in settings.PLUGIN_MARKET.split(","):
if not m:
continue
# 如 package_version 为空,则需要判断插件是否兼容当前版本
if not package_version:
if plugin_info.get(settings.VERSION_FLAG) is not True:
# 插件当前版本不兼容
# 创建任务获取 v1 版本插件
base_task = asyncio.create_task(self.async_get_plugins_from_market(m, None, force))
tasks.append(base_task)
task_to_version[base_task] = "base_version"
# 创建任务获取高版本插件(如 v2、v3
if settings.VERSION_FLAG:
higher_version_task = asyncio.create_task(
self.async_get_plugins_from_market(m, settings.VERSION_FLAG, force))
tasks.append(higher_version_task)
task_to_version[higher_version_task] = "higher_version"
# 并发执行所有任务
if tasks:
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(completed_tasks):
task = tasks[i]
version = task_to_version[task]
# 检查是否有异常
if isinstance(result, Exception):
logger.error(f"获取插件市场数据失败:{str(result)}")
continue
# 运行状插件
plugin_obj = self._running_plugins.get(pid)
# 非运行态插件
plugin_static = self._plugins.get(pid)
# 基本属性
plugin = schemas.Plugin()
# ID
plugin.id = pid
# 安装状态
if pid in installed_apps and plugin_static:
plugin.installed = True
else:
plugin.installed = False
# 是否有新版本
plugin.has_update = False
if plugin_static:
installed_version = getattr(plugin_static, "plugin_version")
if StringUtils.compare_version(installed_version, "<", plugin_info.get("version")):
# 需要更新
plugin.has_update = True
# 运行状态
if plugin_obj and hasattr(plugin_obj, "get_state"):
try:
state = plugin_obj.get_state()
except Exception as e:
logger.error(f"获取插件 {pid} 状态出错:{str(e)}")
state = False
plugin.state = state
else:
plugin.state = False
# 是否有详情页面
plugin.has_page = False
if plugin_obj and hasattr(plugin_obj, "get_page"):
if ObjectUtils.check_method(plugin_obj.get_page):
plugin.has_page = True
# 公钥
if plugin_info.get("key"):
plugin.plugin_public_key = plugin_info.get("key")
# 权限
if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info):
continue
# 名称
if plugin_info.get("name"):
plugin.plugin_name = plugin_info.get("name")
# 描述
if plugin_info.get("description"):
plugin.plugin_desc = plugin_info.get("description")
# 版本
if plugin_info.get("version"):
plugin.plugin_version = plugin_info.get("version")
# 图标
if plugin_info.get("icon"):
plugin.plugin_icon = plugin_info.get("icon")
# 标签
if plugin_info.get("labels"):
plugin.plugin_label = plugin_info.get("labels")
# 作者
if plugin_info.get("author"):
plugin.plugin_author = plugin_info.get("author")
# 更新历史
if plugin_info.get("history"):
plugin.history = plugin_info.get("history")
# 仓库链接
plugin.repo_url = market
# 本地标志
plugin.is_local = False
# 添加顺序
plugin.add_time = add_time
# 汇总
ret_plugins.append(plugin)
plugins = result
if plugins:
if version == "higher_version":
higher_version_plugins.extend(plugins) # 收集高版本插件
else:
base_version_plugins.extend(plugins) # 收集 v1 版本插件
return self._process_plugins_list(higher_version_plugins, base_version_plugins)
async def async_get_plugins_from_market(self, market: str,
package_version: Optional[str] = None,
force: bool = False) -> Optional[List[schemas.Plugin]]:
"""
异步从指定的市场获取插件信息
:param market: 市场的 URL 或标识
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新(忽略缓存)
:return: 返回插件的列表,若获取失败返回 []
"""
if not market:
return []
# 已安装插件
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 获取在线插件
online_plugins = await PluginHelper().async_get_plugins(market, package_version, force)
if online_plugins is None:
logger.warning(
f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接")
return []
ret_plugins = []
add_time = len(online_plugins)
for pid, plugin_info in online_plugins.items():
plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version)
if plugin:
ret_plugins.append(plugin)
add_time -= 1
return ret_plugins

View File

@@ -7,6 +7,10 @@ import traceback
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set
import aiofiles
import aioshutil
import httpx
from aiopath import AsyncPath
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from packaging.version import Version, InvalidVersion
from pkg_resources import Requirement, working_set
@@ -17,7 +21,7 @@ from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
@@ -95,7 +99,8 @@ class PluginHelper(metaclass=WeakSingleton):
return None
return {}
def get_plugin_package_version(self, pid: str, repo_url: str, package_version: Optional[str] = None) -> Optional[str]:
def get_plugin_package_version(self, pid: str, repo_url: str,
package_version: Optional[str] = None) -> Optional[str]:
"""
检查并获取指定插件的可用版本,支持多版本优先级加载和版本兼容性检测
1. 如果未指定版本,则使用系统配置的默认版本(通过 settings.VERSION_FLAG 设置)
@@ -743,3 +748,602 @@ class PluginHelper(metaclass=WeakSingleton):
:return: 标准化后的包名
"""
return name.lower().replace("-", "_") if name else name
async def async_get_plugin_package_version(self, pid: str, repo_url: str,
package_version: Optional[str] = None) -> Optional[str]:
"""
异步版本的获取插件版本方法,功能同 get_plugin_package_version
"""
if not package_version:
package_version = settings.VERSION_FLAG
if pid in (await self.async_get_plugins(repo_url, package_version) or []):
return package_version
plugin = (await self.async_get_plugins(repo_url) or {}).get(pid, None)
if plugin and plugin.get(package_version) is True:
return ""
return None
@staticmethod
async def __async_request_with_fallback(url: str,
headers: Optional[dict] = None,
timeout: Optional[int] = 60,
is_api: bool = False) -> Optional[httpx.Response]:
"""
使用自动降级策略,异步请求资源,优先级依次为镜像站、代理、直连
:param url: 目标URL
:param headers: 请求头信息
:param timeout: 请求超时时间
:param is_api: 是否为GitHub API请求API请求不走镜像站
:return: 请求成功则返回 Response失败返回 None
"""
strategies = []
# 1. 尝试使用镜像站镜像站一般不支持API请求因此API请求直接跳过镜像站
if not is_api and settings.GITHUB_PROXY:
proxy_url = f"{UrlUtils.standardize_base_url(settings.GITHUB_PROXY)}{url}"
strategies.append(("镜像站", proxy_url, {"headers": headers, "timeout": timeout}))
# 2. 尝试使用代理
if settings.PROXY_HOST:
strategies.append(("代理", url, {"headers": headers, "proxies": settings.PROXY, "timeout": timeout}))
# 3. 最后尝试直连
strategies.append(("直连", url, {"headers": headers, "timeout": timeout}))
# 遍历策略并尝试请求
for strategy_name, target_url, request_params in strategies:
logger.debug(f"[GitHub] 尝试使用策略:{strategy_name} 请求 URL{target_url}")
try:
res = await AsyncRequestUtils(**request_params).get_res(url=target_url, raise_exception=True)
logger.debug(f"[GitHub] 请求成功,策略:{strategy_name}, URL: {target_url}")
return res
except Exception as e:
logger.error(f"[GitHub] 请求失败,策略:{strategy_name}, URL: {target_url},错误:{str(e)}")
logger.error(f"[GitHub] 所有策略均请求失败URL: {url},请检查网络连接或 GitHub 配置")
return None
async def async_get_plugins(self, repo_url: str, package_version: Optional[str] = None,
force: bool = False) -> Optional[Dict[str, dict]]:
"""
异步获取Github所有最新插件列表
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
:param force: 是否强制刷新,忽略缓存
"""
# 异步版本直接调用不带缓存的版本(缓存在异步环境下可能有并发问题)
if force:
return await self._async_get_plugins_uncached(repo_url, package_version)
return await self._async_get_plugins_cached(repo_url, package_version)
@cached(maxsize=64, ttl=1800)
async def _async_get_plugins_cached(self, repo_url: str,
package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
获取Github所有最新插件列表使用缓存
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
"""
return await self._async_get_plugins_uncached(repo_url, package_version)
async def _async_get_plugins_uncached(self, repo_url: str,
package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
异步获取Github所有最新插件列表不使用缓存
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
"""
if not repo_url:
return None
user, repo = self.get_repo_info(repo_url)
if not user or not repo:
return None
raw_url = self._base_url.format(user=user, repo=repo)
package_url = f"{raw_url}package.{package_version}.json" if package_version else f"{raw_url}package.json"
res = await self.__async_request_with_fallback(package_url,
headers=settings.REPO_GITHUB_HEADERS(repo=f"{user}/{repo}"))
if res is None:
return None
if res:
content = res.text
try:
return json.loads(content)
except json.JSONDecodeError:
if "404: Not Found" not in content:
logger.warn(f"插件包数据解析失败:{content}")
return None
return {}
async def async_get_statistic(self) -> Dict:
"""
异步获取插件安装统计
"""
if not settings.PLUGIN_STATISTIC_SHARE:
return {}
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=10).get_res(self._install_statistic)
if res and res.status_code == 200:
return res.json()
return {}
async def async_install_reg(self, pid: str) -> bool:
"""
异步安装插件统计
"""
if not settings.PLUGIN_STATISTIC_SHARE:
return False
if not pid:
return False
install_reg_url = self._install_reg.format(pid=pid)
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5).get_res(install_reg_url)
if res and res.status_code == 200:
return True
return False
async def async_install_report(self) -> bool:
"""
异步上报存量插件安装统计
"""
if not settings.PLUGIN_STATISTIC_SHARE:
return False
plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins)
if not plugins:
return False
res = await AsyncRequestUtils(proxies=settings.PROXY,
content_type="application/json",
timeout=5).post(self._install_report,
json={"plugins": [{"plugin_id": plugin} for plugin in plugins]})
return True if res else False
async def __async_get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \
Tuple[Optional[list], Optional[str]]:
"""
异步获取插件的文件列表
:param pid: 插件 ID
:param user_repo: GitHub 仓库的 user/repo 路径
:return: (文件列表, 错误信息)
"""
file_api = f"https://api.github.com/repos/{user_repo}/contents/plugins"
# 如果 package_version 存在(如 "v2"),则加上版本号
if package_version:
file_api += f".{package_version}"
file_api += f"/{pid}"
res = await self.__async_request_with_fallback(file_api,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo),
is_api=True,
timeout=30)
if res is None:
return None, "连接仓库失败"
elif res.status_code != 200:
return None, f"连接仓库失败:{res.status_code} - " \
f"{'超出速率限制请设置Github Token或稍后重试' if res.status_code == 403 else res.text}"
try:
ret = res.json()
if isinstance(ret, list) and len(ret) > 0 and "message" not in ret[0]:
return ret, ""
else:
return None, "插件在仓库中不存在或返回数据格式不正确"
except Exception as e:
logger.error(f"插件数据解析失败:{e}")
return None, "插件数据解析失败"
async def __async_download_files(self, pid: str, file_list: List[dict], user_repo: str,
package_version: Optional[str] = None,
skip_requirements: bool = False) -> Tuple[bool, str]:
"""
异步下载插件文件
:param pid: 插件 ID
:param file_list: 要下载的文件列表,包含文件的元数据(包括下载链接)
:param user_repo: GitHub 仓库的 user/repo 路径
:param skip_requirements: 是否跳过 requirements.txt 文件的下载
:return: (是否成功, 错误信息)
"""
if not file_list:
return False, "文件列表为空"
# 使用栈结构来替代递归调用,避免递归深度过大问题
stack = [(pid, file_list)]
while stack:
current_pid, current_file_list = stack.pop()
for item in current_file_list:
# 跳过 requirements.txt 的下载
if skip_requirements and item.get("name") == "requirements.txt":
continue
if item.get("download_url"):
logger.debug(f"正在下载文件:{item.get('path')}")
res = await self.__async_request_with_fallback(item.get('download_url'),
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
if not res:
return False, f"文件 {item.get('path')} 下载失败!"
elif res.status_code != 200:
return False, f"下载文件 {item.get('path')} 失败:{res.status_code}"
# 确保文件路径不包含版本号(如 v2、v3如果有 package_version移除路径中的版本号
relative_path = item.get("path")
if package_version:
relative_path = relative_path.replace(f"plugins.{package_version}", "plugins", 1)
# 创建插件文件夹并写入文件
file_path = AsyncPath(settings.ROOT_PATH) / "app" / relative_path
await file_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(file_path, "w", encoding="utf-8") as f:
await f.write(res.text)
logger.debug(f"文件 {item.get('path')} 下载成功,保存路径:{file_path}")
else:
# 如果是子目录,则将子目录内容加入栈中继续处理
sub_list, msg = await self.__async_get_file_list(f"{current_pid}/{item.get('name')}", user_repo,
package_version)
if not sub_list:
return False, msg
stack.append((f"{current_pid}/{item.get('name')}", sub_list))
return True, ""
async def __async_download_and_install_requirements(self, requirements_file_info: dict, pid: str, user_repo: str) \
-> Tuple[bool, str]:
"""
异步下载并安装 requirements.txt 文件中的依赖
:param requirements_file_info: requirements.txt 文件的元数据信息
:param pid: 插件 ID
:param user_repo: GitHub 仓库的 user/repo 路径
:return: (是否成功, 错误信息)
"""
# 下载 requirements.txt
res = await self.__async_request_with_fallback(requirements_file_info.get("download_url"),
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
if not res:
return False, "requirements.txt 文件下载失败"
elif res.status_code != 200:
return False, f"下载 requirements.txt 文件失败:{res.status_code}"
requirements_txt = res.text
if requirements_txt.strip():
# 保存并安装依赖
requirements_file_path = AsyncPath(PLUGIN_DIR) / pid.lower() / "requirements.txt"
await requirements_file_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(requirements_file_path, "w", encoding="utf-8") as f:
await f.write(requirements_txt)
return self.pip_install_with_fallback(Path(requirements_file_path))
return True, "" # 如果 requirements.txt 为空,视作成功
async def __async_backup_plugin(self, pid: str) -> str:
"""
异步备份旧插件目录
:param pid: 插件 ID
:return: 备份目录路径
"""
plugin_dir = AsyncPath(PLUGIN_DIR) / pid
backup_dir = AsyncPath(settings.TEMP_PATH) / "plugin_backup" / pid
if await plugin_dir.exists():
# 备份时清理已有的备份目录,防止残留文件影响
if await backup_dir.exists():
await aioshutil.rmtree(backup_dir, ignore_errors=True)
logger.debug(f"{pid} 旧的备份目录已清理 {backup_dir}")
# 异步复制目录
await self._async_copytree(plugin_dir, backup_dir)
logger.debug(f"{pid} 插件已备份到 {backup_dir}")
return str(backup_dir) if await backup_dir.exists() else None
@staticmethod
async def __async_restore_plugin(pid: str, backup_dir: str):
"""
异步还原旧插件目录
:param pid: 插件 ID
:param backup_dir: 备份目录路径
"""
plugin_dir = AsyncPath(PLUGIN_DIR) / pid
if await plugin_dir.exists():
await aioshutil.rmtree(plugin_dir, ignore_errors=True)
logger.debug(f"{pid} 已清理插件目录 {plugin_dir}")
backup_path = AsyncPath(backup_dir)
if await backup_path.exists():
await PluginHelper._async_copytree(backup_path, plugin_dir)
logger.debug(f"{pid} 已还原插件目录 {plugin_dir}")
await aioshutil.rmtree(backup_path, ignore_errors=True)
logger.debug(f"{pid} 已删除备份目录 {backup_dir}")
@staticmethod
async def __async_remove_old_plugin(pid: str):
"""
异步删除旧插件
:param pid: 插件 ID
"""
plugin_dir = AsyncPath(PLUGIN_DIR) / pid
if await plugin_dir.exists():
await aioshutil.rmtree(plugin_dir, ignore_errors=True)
async def _async_copytree(self, src: AsyncPath, dst: AsyncPath):
"""
异步递归复制目录
:param src: 源目录
:param dst: 目标目录
"""
if not await src.exists():
return
await dst.mkdir(parents=True, exist_ok=True)
async for item in src.iterdir():
dst_item = dst / item.name
if await item.is_dir():
await self._async_copytree(item, dst_item)
else:
async with aiofiles.open(item, 'rb') as src_file:
content = await src_file.read()
async with aiofiles.open(dst_item, 'wb') as dst_file:
await dst_file.write(content)
async def __async_install_dependencies_if_required(self, pid: str) -> Tuple[bool, bool, str]:
"""
异步安装插件依赖。
:param pid: 插件 ID
:return: (是否存在依赖,安装是否成功, 错误信息)
"""
# 定位插件目录和依赖文件
plugin_dir = AsyncPath(PLUGIN_DIR) / pid.lower()
requirements_file = plugin_dir / "requirements.txt"
# 检查是否存在 requirements.txt 文件
if await requirements_file.exists():
logger.info(f"{pid} 存在依赖,开始尝试安装依赖")
success, error_message = self.pip_install_with_fallback(Path(requirements_file))
if success:
return True, True, ""
else:
return True, False, error_message
return False, False, "不存在依赖"
async def async_install_dependencies(self, dependencies: List[str]) -> Tuple[bool, str]:
"""
异步安装指定的依赖项列表
:param dependencies: 需要安装或更新的依赖项列表
:return: (success, message)
"""
if not dependencies:
return False, "没有传入需要安装的依赖项"
try:
logger.debug(f"需要安装或更新的依赖项:{dependencies}")
# 创建临时的 requirements.txt 文件用于批量安装
requirements_temp_file = AsyncPath(settings.TEMP_PATH) / "plugin_dependencies" / "requirements.txt"
await requirements_temp_file.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(requirements_temp_file, "w", encoding="utf-8") as f:
for dep in dependencies:
await f.write(dep + "\n")
try:
# 使用自动降级策略安装依赖
return self.pip_install_with_fallback(Path(requirements_temp_file))
finally:
# 删除临时文件
await requirements_temp_file.unlink()
except Exception as e:
logger.error(f"安装依赖项时发生错误:{e}")
return False, f"安装依赖项时发生错误:{e}"
async def __async_find_plugin_dependencies(self) -> Dict[str, str]:
"""
异步收集所有插件的依赖项
遍历 plugins 目录下的所有插件,查找存在 requirements.txt 的插件目录
,并解析其中的依赖项,同时将所有插件的依赖项合并到字典中,方便后续统一处理
:return: 依赖项字典,格式为 {package_name: set(version_specifiers)}
"""
dependencies = {}
try:
install_plugins = {
plugin_id.lower() # 对应插件的小写目录名
for plugin_id in SystemConfigOper().get(
SystemConfigKey.UserInstalledPlugins
) or []
}
plugin_dir_path = AsyncPath(PLUGIN_DIR)
async for plugin_dir in plugin_dir_path.iterdir():
if await plugin_dir.is_dir():
requirements_file = plugin_dir / "requirements.txt"
if await requirements_file.exists():
if plugin_dir.name not in install_plugins:
# 这个插件不在安装列表中 忽略它的依赖
logger.debug(f"忽略插件 {plugin_dir.name} 的依赖")
continue
# 解析当前插件的 requirements.txt获取依赖项
plugin_deps = await self.__async_parse_requirements(requirements_file)
for pkg_name, version_specifiers in plugin_deps.items():
if pkg_name in dependencies:
# 更新已存在的包的版本约束集合
dependencies[pkg_name].update(version_specifiers)
else:
# 添加新的包及其版本约束
dependencies[pkg_name] = set(version_specifiers)
return self.__merge_dependencies(dependencies)
except Exception as e:
logger.error(f"收集插件依赖项时发生错误:{e}")
return {}
async def __async_parse_requirements(self, requirements_file: AsyncPath) -> Dict[str, List[str]]:
"""
异步解析 requirements.txt 文件,返回依赖项字典
使用 packaging 库解析每一行依赖项,提取包名和版本约束
对于无法解析的行,记录警告日志,便于后续检查
:param requirements_file: requirements.txt 文件的路径
:return: 依赖项字典,格式为 {package_name: [version_specifier]}
"""
dependencies = {}
try:
async with aiofiles.open(requirements_file, "r", encoding="utf-8") as f:
async for line in f:
line = str(line).strip()
if line and not line.startswith('#'):
# 使用 packaging 库解析依赖项
try:
req = Requirement(line)
pkg_name = self.__standardize_pkg_name(req.name)
version_specifier = str(req.specifier)
if pkg_name in dependencies:
dependencies[pkg_name].append(version_specifier)
else:
dependencies[pkg_name] = [version_specifier]
except Exception as e:
logger.debug(f"无法解析依赖项 '{line}'{e}")
return dependencies
except Exception as e:
logger.error(f"解析 requirements.txt 时发生错误:{e}")
return {}
async def async_find_missing_dependencies(self) -> List[str]:
"""
异步收集所有需要安装或更新的依赖项
1. 收集所有插件的依赖项,合并版本约束
2. 获取已安装的包及其版本
3. 比较已安装的包与所需的依赖项,找出需要安装或升级的包
:return: 需要安装或更新的依赖项列表,例如 ["package1>=1.0.0", "package2"]
"""
try:
# 收集所有插件的依赖项
plugin_dependencies = await self.__async_find_plugin_dependencies() # 返回格式为 {package_name: version_specifier}
# 获取已安装的包及其版本
installed_packages = self.__get_installed_packages() # 返回格式为 {package_name: Version}
# 需要安装或更新的依赖项列表
dependencies_to_install = []
for pkg_name, version_specifier in plugin_dependencies.items():
spec_set = SpecifierSet(version_specifier)
installed_version = installed_packages.get(pkg_name)
if installed_version is None:
# 包未安装,需要安装
if version_specifier:
dependencies_to_install.append(f"{pkg_name}{version_specifier}")
else:
dependencies_to_install.append(pkg_name)
elif not spec_set.contains(installed_version, prereleases=True):
# 已安装的版本不满足版本约束,需要升级或降级
if version_specifier:
dependencies_to_install.append(f"{pkg_name}{version_specifier}")
else:
dependencies_to_install.append(pkg_name)
# 已安装的版本满足要求,无需操作
return dependencies_to_install
except Exception as e:
logger.error(f"收集所有需要安装或更新的依赖项时发生错误:{e}")
return []
async def async_install(self, pid: str, repo_url: str, package_version: Optional[str] = None,
force_install: bool = False) -> Tuple[bool, str]:
"""
异步安装插件,包括依赖安装和文件下载,相关资源支持自动降级策略
1. 检查并获取插件的指定版本,确认版本兼容性
2. 从 GitHub 获取文件列表(包括 requirements.txt
3. 删除旧的插件目录(如非强制安装则进行备份)
4. 下载并预安装 requirements.txt 中的依赖(如果存在)
5. 下载并安装插件的其他文件
6. 再次尝试安装依赖(确保安装完整)
:param pid: 插件 ID
:param repo_url: 插件仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如不指定则默认使用系统配置的版本
:param force_install: 是否强制安装插件,默认不启用,启用时不进行备份和恢复操作
:return: (是否成功, 错误信息)
"""
if SystemUtils.is_frozen():
return False, "可执行文件模式下,只能安装本地插件"
# 验证参数
if not pid or not repo_url:
return False, "参数错误"
# 从 GitHub 的 repo_url 获取用户和项目名
user, repo = self.get_repo_info(repo_url)
if not user or not repo:
return False, "不支持的插件仓库地址格式"
user_repo = f"{user}/{repo}"
if not package_version:
package_version = settings.VERSION_FLAG
# 1. 优先检查指定版本的插件
package_version = await self.async_get_plugin_package_version(pid, repo_url, package_version)
# 如果 package_version 为None说明没有找到匹配的插件
if package_version is None:
msg = f"{pid} 没有找到适用于当前版本的插件"
logger.debug(msg)
return False, msg
# package_version 为空,表示从 package.json 中找到插件
elif package_version == "":
logger.debug(f"{pid} 从 package.json 中找到适用于当前版本的插件")
else:
logger.debug(f"{pid} 从 package.{package_version}.json 中找到适用于当前版本的插件")
# 2. 获取插件文件列表(包括 requirements.txt
file_list, msg = await self.__async_get_file_list(pid.lower(), user_repo, package_version)
if not file_list:
return False, msg
# 3. 删除旧的插件目录,如果不强制安装则备份
backup_dir = None
if not force_install:
backup_dir = await self.__async_backup_plugin(pid.lower())
await self.__async_remove_old_plugin(pid.lower())
# 4. 查找并安装 requirements.txt 中的依赖,确保插件环境的依赖尽可能完整。依赖安装可能失败且不影响插件安装,目前只记录日志
requirements_file_info = next((f for f in file_list if f.get("name") == "requirements.txt"), None)
if requirements_file_info:
logger.debug(f"{pid} 发现 requirements.txt提前下载并预安装依赖")
success, message = await self.__async_download_and_install_requirements(requirements_file_info,
pid, user_repo)
if not success:
logger.debug(f"{pid} 依赖预安装失败:{message}")
else:
logger.debug(f"{pid} 依赖预安装成功")
# 5. 下载插件的其他文件
logger.info(f"{pid} 准备开始下载插件文件")
success, message = await self.__async_download_files(pid.lower(), file_list, user_repo, package_version, True)
if not success:
logger.error(f"{pid} 下载插件文件失败:{message}")
if backup_dir:
await self.__async_restore_plugin(pid.lower(), backup_dir)
logger.warning(f"{pid} 插件安装失败,已还原备份插件")
else:
await self.__async_remove_old_plugin(pid.lower())
logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装")
return False, message
else:
logger.info(f"{pid} 下载插件文件成功")
# 6. 插件文件安装成功后,再次尝试安装依赖,避免因为遗漏依赖导致的插件运行问题,目前依旧只记录日志
dependencies_exist, success, message = await self.__async_install_dependencies_if_required(pid)
if dependencies_exist:
if not success:
logger.error(f"{pid} 依赖安装失败:{message}")
if backup_dir:
await self.__async_restore_plugin(pid.lower(), backup_dir)
logger.warning(f"{pid} 插件安装失败,已还原备份插件")
else:
await self.__async_remove_old_plugin(pid.lower())
logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装")
else:
logger.info(f"{pid} 依赖安装成功")
# 插件安装成功后,统计安装信息
await self.async_install_reg(pid)
return True, ""

View File

@@ -7,6 +7,7 @@ passlib~=1.7.4
PyJWT~=2.10.1
python-multipart~=0.0.9
aiofiles~=24.1.0
aioshutil~=1.5
alembic~=1.16.2
bcrypt~=4.0.1
regex~=2024.11.6