diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index f8058f22..dd11f9c9 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -2,9 +2,11 @@ import mimetypes import shutil from typing import Annotated, Any, List, Optional +import aiofiles +from aiopath import AsyncPath from fastapi import APIRouter, Depends, Header, HTTPException from starlette import status -from starlette.responses import FileResponse +from starlette.responses import StreamingResponse from app import schemas from app.command import Command @@ -12,7 +14,7 @@ from app.core.config import settings from app.core.plugin import PluginManager from app.core.security import verify_apikey, verify_token from app.db.systemconfig_oper import SystemConfigOper -from app.db.user_oper import get_current_active_superuser +from app.db.user_oper import get_current_active_superuser, get_current_active_superuser_async from app.factory import app from app.helper.plugin import PluginHelper from app.log import logger @@ -136,13 +138,14 @@ def register_plugin(plugin_id: str): @router.get("/", summary="所有插件", response_model=List[schemas.Plugin]) -def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser), - state: Optional[str] = "all", force: bool = False) -> List[schemas.Plugin]: +async def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser_async), + state: Optional[str] = "all", force: bool = False) -> List[schemas.Plugin]: """ 查询所有插件清单,包括本地插件和在线插件,插件状态:installed, market, all """ # 本地插件 - local_plugins = PluginManager().get_local_plugins() + plugin_manager = PluginManager() + local_plugins = plugin_manager.get_local_plugins() # 已安装插件 installed_plugins = [plugin for plugin in local_plugins if plugin.installed] if state == "installed": @@ -151,7 +154,7 @@ def all_plugins(_: schemas.TokenPayload = Depends(get_current_active_superuser), # 未安装的本地插件 not_installed_plugins = [plugin for plugin in local_plugins if not plugin.installed] # 在线插件 - online_plugins = PluginManager().get_online_plugins(force) + online_plugins = await plugin_manager.async_get_online_plugins(force) if not online_plugins: # 没有获取在线插件 if state == "market": @@ -192,11 +195,11 @@ def installed(_: schemas.TokenPayload = Depends(get_current_active_superuser)) - @router.get("/statistic", summary="插件安装统计", response_model=dict) -def statistic(_: schemas.TokenPayload = Depends(verify_token)) -> Any: +async def statistic(_: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 插件安装统计 """ - return PluginHelper().get_statistic() + return await PluginHelper().async_get_statistic() @router.get("/reload/{plugin_id}", summary="重新加载插件", response_model=schemas.Response) @@ -222,12 +225,13 @@ def install(plugin_id: str, # 已安装插件 install_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or [] # 首先检查插件是否已经存在,并且是否强制安装,否则只进行安装统计 + plugin_helper = PluginHelper() if not force and plugin_id in PluginManager().get_plugin_ids(): - PluginHelper().install_reg(pid=plugin_id) + plugin_helper.install_reg(pid=plugin_id) else: # 插件不存在或需要强制安装,下载安装并注册插件 if repo_url: - state, msg = PluginHelper().install(pid=plugin_id, repo_url=repo_url) + state, msg = plugin_helper.install(pid=plugin_id, repo_url=repo_url) # 安装失败则直接响应 if not state: return schemas.Response(success=False, message=msg) @@ -260,7 +264,8 @@ def plugin_form(plugin_id: str, """ 根据插件ID获取插件配置表单或Vue组件URL """ - plugin_instance = PluginManager().running_plugins.get(plugin_id) + plugin_manager = PluginManager() + plugin_instance = plugin_manager.running_plugins.get(plugin_id) if not plugin_instance: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"插件 {plugin_id} 不存在或未加载") @@ -271,7 +276,7 @@ def plugin_form(plugin_id: str, return { "render_mode": render_mode, "conf": conf, - "model": PluginManager().get_plugin_config(plugin_id) or model + "model": plugin_manager.get_plugin_config(plugin_id) or model } except Exception as e: logger.error(f"插件 {plugin_id} 调用方法 get_form 出错: {str(e)}") @@ -343,7 +348,7 @@ def reset_plugin(plugin_id: str, @router.get("/file/{plugin_id}/{filepath:path}", summary="获取插件静态文件") -def plugin_static_file(plugin_id: str, filepath: str): +async def plugin_static_file(plugin_id: str, filepath: str): """ 获取插件静态文件 """ @@ -352,11 +357,11 @@ def plugin_static_file(plugin_id: str, filepath: str): logger.warning(f"Static File API: Path traversal attempt detected: {plugin_id}/{filepath}") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") - plugin_base_dir = settings.ROOT_PATH / "app" / "plugins" / plugin_id.lower() + plugin_base_dir = AsyncPath(settings.ROOT_PATH) / "app" / "plugins" / plugin_id.lower() plugin_file_path = plugin_base_dir / filepath - if not plugin_file_path.exists(): + if not await plugin_file_path.exists(): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{plugin_file_path} 不存在") - if not plugin_file_path.is_file(): + if not await plugin_file_path.is_file(): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"{plugin_file_path} 不是文件") # 判断 MIME 类型 @@ -371,9 +376,20 @@ def plugin_static_file(plugin_id: str, filepath: str): response_type = 'application/octet-stream' try: - return FileResponse(plugin_file_path, media_type=response_type) + # 异步生成器函数,用于流式读取文件 + async def file_generator(): + async with aiofiles.open(plugin_file_path, mode='rb') as file: + # 8KB 块大小 + while chunk := await file.read(8192): + yield chunk + + return StreamingResponse( + file_generator(), + media_type=response_type, + headers={"Content-Disposition": f"inline; filename={plugin_file_path.name}"} + ) except Exception as e: - logger.error(f"Error creating/sending FileResponse for {plugin_file_path}: {e}", exc_info=True) + logger.error(f"Error creating/sending StreamingResponse for {plugin_file_path}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal Server Error") @@ -432,7 +448,8 @@ def delete_plugin_folder(folder_name: str, _: schemas.TokenPayload = Depends(get @router.put("/folders/{folder_name}/plugins", summary="更新文件夹中的插件", response_model=schemas.Response) -def update_folder_plugins(folder_name: str, plugin_ids: List[str], _: schemas.TokenPayload = Depends(get_current_active_superuser)) -> Any: +def update_folder_plugins(folder_name: str, plugin_ids: List[str], + _: schemas.TokenPayload = Depends(get_current_active_superuser)) -> Any: """ 更新指定文件夹中的插件列表 """ diff --git a/app/core/plugin.py b/app/core/plugin.py index 9a3f4b99..d824e600 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -10,7 +10,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple -from app.helper.sites import SitesHelper # noqa from fastapi import HTTPException from starlette import status from watchdog.events import FileSystemEventHandler @@ -22,6 +21,7 @@ from app.core.event import eventmanager, Event from app.db.plugindata_oper import PluginDataOper from app.db.systemconfig_oper import SystemConfigOper from app.helper.plugin import PluginHelper +from app.helper.sites import SitesHelper # noqa from app.log import logger from app.schemas.types import EventType, SystemConfigKey from app.utils.crypto import RSAUtils @@ -850,8 +850,6 @@ class PluginManager(metaclass=Singleton): if not settings.PLUGIN_MARKET: return [] - # 返回值 - all_plugins = [] # 用于存储高于 v1 版本的插件(如 v2, v3 等) higher_version_plugins = [] # 用于存储 v1 版本插件 @@ -884,25 +882,7 @@ class PluginManager(metaclass=Singleton): else: base_version_plugins.extend(plugins) # 收集 v1 版本插件 - # 优先处理高版本插件 - all_plugins.extend(higher_version_plugins) - # 将未出现在高版本插件列表中的 v1 插件加入 all_plugins - higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins} - all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids]) - # 去重 - all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values()) - # 所有插件按 repo 在设置中的顺序排序 - all_plugins.sort( - key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0 - ) - # 相同 ID 的插件保留版本号最大的版本 - max_versions = {} - for p in all_plugins: - if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]): - max_versions[p.id] = p.plugin_version - result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]] - logger.info(f"共获取到 {len(result)} 个线上插件") - return result + return self._process_plugins_list(higher_version_plugins, base_version_plugins) def get_local_plugins(self) -> List[schemas.Plugin]: """ @@ -1032,83 +1012,215 @@ class PluginManager(metaclass=Singleton): ret_plugins = [] add_time = len(online_plugins) for pid, plugin_info in online_plugins.items(): - if not isinstance(plugin_info, dict): + plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version) + if plugin: + ret_plugins.append(plugin) + add_time -= 1 + + return ret_plugins + + @staticmethod + def _process_plugins_list(higher_version_plugins: List[schemas.Plugin], + base_version_plugins: List[schemas.Plugin]) -> List[schemas.Plugin]: + """ + 处理插件列表:合并、去重、排序、保留最高版本 + :param higher_version_plugins: 高版本插件列表 + :param base_version_plugins: 基础版本插件列表 + :return: 处理后的插件列表 + """ + # 优先处理高版本插件 + all_plugins = [] + all_plugins.extend(higher_version_plugins) + # 将未出现在高版本插件列表中的 v1 插件加入 all_plugins + higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins} + all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids]) + # 去重 + all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values()) + # 所有插件按 repo 在设置中的顺序排序 + all_plugins.sort( + key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0 + ) + # 相同 ID 的插件保留版本号最大的版本 + max_versions = {} + for p in all_plugins: + if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]): + max_versions[p.id] = p.plugin_version + result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]] + logger.info(f"共获取到 {len(result)} 个线上插件") + return result + + def _process_plugin_info(self, pid: str, plugin_info: dict, market: str, + installed_apps: List[str], add_time: int, + package_version: Optional[str] = None) -> Optional[schemas.Plugin]: + """ + 处理单个插件信息,创建 schemas.Plugin 对象 + :param pid: 插件ID + :param plugin_info: 插件信息字典 + :param market: 市场URL + :param installed_apps: 已安装插件列表 + :param add_time: 添加顺序 + :param package_version: 包版本 + :return: 创建的插件对象,如果验证失败返回None + """ + if not isinstance(plugin_info, dict): + return None + + # 如 package_version 为空,则需要判断插件是否兼容当前版本 + if not package_version: + if plugin_info.get(settings.VERSION_FLAG) is not True: + # 插件当前版本不兼容 + return None + + # 运行状插件 + plugin_obj = self._running_plugins.get(pid) + # 非运行态插件 + plugin_static = self._plugins.get(pid) + # 基本属性 + plugin = schemas.Plugin() + # ID + plugin.id = pid + # 安装状态 + if pid in installed_apps and plugin_static: + plugin.installed = True + else: + plugin.installed = False + # 是否有新版本 + plugin.has_update = False + if plugin_static: + installed_version = getattr(plugin_static, "plugin_version") + if StringUtils.compare_version(installed_version, "<", plugin_info.get("version")): + # 需要更新 + plugin.has_update = True + # 运行状态 + if plugin_obj and hasattr(plugin_obj, "get_state"): + try: + state = plugin_obj.get_state() + except Exception as e: + logger.error(f"获取插件 {pid} 状态出错:{str(e)}") + state = False + plugin.state = state + else: + plugin.state = False + # 是否有详情页面 + plugin.has_page = False + if plugin_obj and hasattr(plugin_obj, "get_page"): + if ObjectUtils.check_method(plugin_obj.get_page): + plugin.has_page = True + # 公钥 + if plugin_info.get("key"): + plugin.plugin_public_key = plugin_info.get("key") + # 权限 + if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info): + return None + # 名称 + if plugin_info.get("name"): + plugin.plugin_name = plugin_info.get("name") + # 描述 + if plugin_info.get("description"): + plugin.plugin_desc = plugin_info.get("description") + # 版本 + if plugin_info.get("version"): + plugin.plugin_version = plugin_info.get("version") + # 图标 + if plugin_info.get("icon"): + plugin.plugin_icon = plugin_info.get("icon") + # 标签 + if plugin_info.get("labels"): + plugin.plugin_label = plugin_info.get("labels") + # 作者 + if plugin_info.get("author"): + plugin.plugin_author = plugin_info.get("author") + # 更新历史 + if plugin_info.get("history"): + plugin.history = plugin_info.get("history") + # 仓库链接 + plugin.repo_url = market + # 本地标志 + plugin.is_local = False + # 添加顺序 + plugin.add_time = add_time + + return plugin + + async def async_get_online_plugins(self, force: bool = False) -> List[schemas.Plugin]: + """ + 异步获取所有在线插件信息 + """ + if not settings.PLUGIN_MARKET: + return [] + + # 用于存储高于 v1 版本的插件(如 v2, v3 等) + higher_version_plugins = [] + # 用于存储 v1 版本插件 + base_version_plugins = [] + + # 使用异步并发获取线上插件 + import asyncio + tasks = [] + task_to_version = {} + + for m in settings.PLUGIN_MARKET.split(","): + if not m: continue - # 如 package_version 为空,则需要判断插件是否兼容当前版本 - if not package_version: - if plugin_info.get(settings.VERSION_FLAG) is not True: - # 插件当前版本不兼容 + # 创建任务获取 v1 版本插件 + base_task = asyncio.create_task(self.async_get_plugins_from_market(m, None, force)) + tasks.append(base_task) + task_to_version[base_task] = "base_version" + + # 创建任务获取高版本插件(如 v2、v3) + if settings.VERSION_FLAG: + higher_version_task = asyncio.create_task( + self.async_get_plugins_from_market(m, settings.VERSION_FLAG, force)) + tasks.append(higher_version_task) + task_to_version[higher_version_task] = "higher_version" + + # 并发执行所有任务 + if tasks: + completed_tasks = await asyncio.gather(*tasks, return_exceptions=True) + for i, result in enumerate(completed_tasks): + task = tasks[i] + version = task_to_version[task] + + # 检查是否有异常 + if isinstance(result, Exception): + logger.error(f"获取插件市场数据失败:{str(result)}") continue - # 运行状插件 - plugin_obj = self._running_plugins.get(pid) - # 非运行态插件 - plugin_static = self._plugins.get(pid) - # 基本属性 - plugin = schemas.Plugin() - # ID - plugin.id = pid - # 安装状态 - if pid in installed_apps and plugin_static: - plugin.installed = True - else: - plugin.installed = False - # 是否有新版本 - plugin.has_update = False - if plugin_static: - installed_version = getattr(plugin_static, "plugin_version") - if StringUtils.compare_version(installed_version, "<", plugin_info.get("version")): - # 需要更新 - plugin.has_update = True - # 运行状态 - if plugin_obj and hasattr(plugin_obj, "get_state"): - try: - state = plugin_obj.get_state() - except Exception as e: - logger.error(f"获取插件 {pid} 状态出错:{str(e)}") - state = False - plugin.state = state - else: - plugin.state = False - # 是否有详情页面 - plugin.has_page = False - if plugin_obj and hasattr(plugin_obj, "get_page"): - if ObjectUtils.check_method(plugin_obj.get_page): - plugin.has_page = True - # 公钥 - if plugin_info.get("key"): - plugin.plugin_public_key = plugin_info.get("key") - # 权限 - if not self.__set_and_check_auth_level(plugin=plugin, source=plugin_info): - continue - # 名称 - if plugin_info.get("name"): - plugin.plugin_name = plugin_info.get("name") - # 描述 - if plugin_info.get("description"): - plugin.plugin_desc = plugin_info.get("description") - # 版本 - if plugin_info.get("version"): - plugin.plugin_version = plugin_info.get("version") - # 图标 - if plugin_info.get("icon"): - plugin.plugin_icon = plugin_info.get("icon") - # 标签 - if plugin_info.get("labels"): - plugin.plugin_label = plugin_info.get("labels") - # 作者 - if plugin_info.get("author"): - plugin.plugin_author = plugin_info.get("author") - # 更新历史 - if plugin_info.get("history"): - plugin.history = plugin_info.get("history") - # 仓库链接 - plugin.repo_url = market - # 本地标志 - plugin.is_local = False - # 添加顺序 - plugin.add_time = add_time - # 汇总 - ret_plugins.append(plugin) + + plugins = result + if plugins: + if version == "higher_version": + higher_version_plugins.extend(plugins) # 收集高版本插件 + else: + base_version_plugins.extend(plugins) # 收集 v1 版本插件 + + return self._process_plugins_list(higher_version_plugins, base_version_plugins) + + async def async_get_plugins_from_market(self, market: str, + package_version: Optional[str] = None, + force: bool = False) -> Optional[List[schemas.Plugin]]: + """ + 异步从指定的市场获取插件信息 + :param market: 市场的 URL 或标识 + :param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本 + :param force: 是否强制刷新(忽略缓存) + :return: 返回插件的列表,若获取失败返回 [] + """ + if not market: + return [] + # 已安装插件 + installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or [] + # 获取在线插件 + online_plugins = await PluginHelper().async_get_plugins(market, package_version, force) + if online_plugins is None: + logger.warning( + f"获取{package_version if package_version else ''}插件库失败:{market},请检查 GitHub 网络连接") + return [] + ret_plugins = [] + add_time = len(online_plugins) + for pid, plugin_info in online_plugins.items(): + plugin = self._process_plugin_info(pid, plugin_info, market, installed_apps, add_time, package_version) + if plugin: + ret_plugins.append(plugin) add_time -= 1 return ret_plugins diff --git a/app/helper/plugin.py b/app/helper/plugin.py index 4976e5b5..58f2dfa9 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -7,6 +7,10 @@ import traceback from pathlib import Path from typing import Dict, List, Optional, Tuple, Set +import aiofiles +import aioshutil +import httpx +from aiopath import AsyncPath from packaging.specifiers import SpecifierSet, InvalidSpecifier from packaging.version import Version, InvalidVersion from pkg_resources import Requirement, working_set @@ -17,7 +21,7 @@ from app.core.config import settings from app.db.systemconfig_oper import SystemConfigOper from app.log import logger from app.schemas.types import SystemConfigKey -from app.utils.http import RequestUtils +from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.singleton import WeakSingleton from app.utils.system import SystemUtils from app.utils.url import UrlUtils @@ -95,7 +99,8 @@ class PluginHelper(metaclass=WeakSingleton): return None return {} - def get_plugin_package_version(self, pid: str, repo_url: str, package_version: Optional[str] = None) -> Optional[str]: + def get_plugin_package_version(self, pid: str, repo_url: str, + package_version: Optional[str] = None) -> Optional[str]: """ 检查并获取指定插件的可用版本,支持多版本优先级加载和版本兼容性检测 1. 如果未指定版本,则使用系统配置的默认版本(通过 settings.VERSION_FLAG 设置) @@ -743,3 +748,602 @@ class PluginHelper(metaclass=WeakSingleton): :return: 标准化后的包名 """ return name.lower().replace("-", "_") if name else name + + async def async_get_plugin_package_version(self, pid: str, repo_url: str, + package_version: Optional[str] = None) -> Optional[str]: + """ + 异步版本的获取插件版本方法,功能同 get_plugin_package_version + """ + if not package_version: + package_version = settings.VERSION_FLAG + + if pid in (await self.async_get_plugins(repo_url, package_version) or []): + return package_version + + plugin = (await self.async_get_plugins(repo_url) or {}).get(pid, None) + if plugin and plugin.get(package_version) is True: + return "" + + return None + + @staticmethod + async def __async_request_with_fallback(url: str, + headers: Optional[dict] = None, + timeout: Optional[int] = 60, + is_api: bool = False) -> Optional[httpx.Response]: + """ + 使用自动降级策略,异步请求资源,优先级依次为镜像站、代理、直连 + :param url: 目标URL + :param headers: 请求头信息 + :param timeout: 请求超时时间 + :param is_api: 是否为GitHub API请求,API请求不走镜像站 + :return: 请求成功则返回 Response,失败返回 None + """ + strategies = [] + + # 1. 尝试使用镜像站,镜像站一般不支持API请求,因此API请求直接跳过镜像站 + if not is_api and settings.GITHUB_PROXY: + proxy_url = f"{UrlUtils.standardize_base_url(settings.GITHUB_PROXY)}{url}" + strategies.append(("镜像站", proxy_url, {"headers": headers, "timeout": timeout})) + + # 2. 尝试使用代理 + if settings.PROXY_HOST: + strategies.append(("代理", url, {"headers": headers, "proxies": settings.PROXY, "timeout": timeout})) + + # 3. 最后尝试直连 + strategies.append(("直连", url, {"headers": headers, "timeout": timeout})) + + # 遍历策略并尝试请求 + for strategy_name, target_url, request_params in strategies: + logger.debug(f"[GitHub] 尝试使用策略:{strategy_name} 请求 URL:{target_url}") + + try: + res = await AsyncRequestUtils(**request_params).get_res(url=target_url, raise_exception=True) + logger.debug(f"[GitHub] 请求成功,策略:{strategy_name}, URL: {target_url}") + return res + except Exception as e: + logger.error(f"[GitHub] 请求失败,策略:{strategy_name}, URL: {target_url},错误:{str(e)}") + + logger.error(f"[GitHub] 所有策略均请求失败,URL: {url},请检查网络连接或 GitHub 配置") + return None + + async def async_get_plugins(self, repo_url: str, package_version: Optional[str] = None, + force: bool = False) -> Optional[Dict[str, dict]]: + """ + 异步获取Github所有最新插件列表 + :param repo_url: Github仓库地址 + :param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本 + :param force: 是否强制刷新,忽略缓存 + """ + # 异步版本直接调用不带缓存的版本(缓存在异步环境下可能有并发问题) + if force: + return await self._async_get_plugins_uncached(repo_url, package_version) + return await self._async_get_plugins_cached(repo_url, package_version) + + @cached(maxsize=64, ttl=1800) + async def _async_get_plugins_cached(self, repo_url: str, + package_version: Optional[str] = None) -> Optional[Dict[str, dict]]: + """ + 获取Github所有最新插件列表(使用缓存) + :param repo_url: Github仓库地址 + :param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本 + """ + return await self._async_get_plugins_uncached(repo_url, package_version) + + async def _async_get_plugins_uncached(self, repo_url: str, + package_version: Optional[str] = None) -> Optional[Dict[str, dict]]: + """ + 异步获取Github所有最新插件列表(不使用缓存) + :param repo_url: Github仓库地址 + :param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本 + """ + if not repo_url: + return None + + user, repo = self.get_repo_info(repo_url) + if not user or not repo: + return None + + raw_url = self._base_url.format(user=user, repo=repo) + package_url = f"{raw_url}package.{package_version}.json" if package_version else f"{raw_url}package.json" + + res = await self.__async_request_with_fallback(package_url, + headers=settings.REPO_GITHUB_HEADERS(repo=f"{user}/{repo}")) + if res is None: + return None + if res: + content = res.text + try: + return json.loads(content) + except json.JSONDecodeError: + if "404: Not Found" not in content: + logger.warn(f"插件包数据解析失败:{content}") + return None + return {} + + async def async_get_statistic(self) -> Dict: + """ + 异步获取插件安装统计 + """ + if not settings.PLUGIN_STATISTIC_SHARE: + return {} + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=10).get_res(self._install_statistic) + if res and res.status_code == 200: + return res.json() + return {} + + async def async_install_reg(self, pid: str) -> bool: + """ + 异步安装插件统计 + """ + if not settings.PLUGIN_STATISTIC_SHARE: + return False + if not pid: + return False + install_reg_url = self._install_reg.format(pid=pid) + res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5).get_res(install_reg_url) + if res and res.status_code == 200: + return True + return False + + async def async_install_report(self) -> bool: + """ + 异步上报存量插件安装统计 + """ + if not settings.PLUGIN_STATISTIC_SHARE: + return False + plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) + if not plugins: + return False + res = await AsyncRequestUtils(proxies=settings.PROXY, + content_type="application/json", + timeout=5).post(self._install_report, + json={"plugins": [{"plugin_id": plugin} for plugin in plugins]}) + return True if res else False + + async def __async_get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \ + Tuple[Optional[list], Optional[str]]: + """ + 异步获取插件的文件列表 + :param pid: 插件 ID + :param user_repo: GitHub 仓库的 user/repo 路径 + :return: (文件列表, 错误信息) + """ + file_api = f"https://api.github.com/repos/{user_repo}/contents/plugins" + # 如果 package_version 存在(如 "v2"),则加上版本号 + if package_version: + file_api += f".{package_version}" + file_api += f"/{pid}" + + res = await self.__async_request_with_fallback(file_api, + headers=settings.REPO_GITHUB_HEADERS(repo=user_repo), + is_api=True, + timeout=30) + if res is None: + return None, "连接仓库失败" + elif res.status_code != 200: + return None, f"连接仓库失败:{res.status_code} - " \ + f"{'超出速率限制,请设置Github Token或稍后重试' if res.status_code == 403 else res.text}" + + try: + ret = res.json() + if isinstance(ret, list) and len(ret) > 0 and "message" not in ret[0]: + return ret, "" + else: + return None, "插件在仓库中不存在或返回数据格式不正确" + except Exception as e: + logger.error(f"插件数据解析失败:{e}") + return None, "插件数据解析失败" + + async def __async_download_files(self, pid: str, file_list: List[dict], user_repo: str, + package_version: Optional[str] = None, + skip_requirements: bool = False) -> Tuple[bool, str]: + """ + 异步下载插件文件 + :param pid: 插件 ID + :param file_list: 要下载的文件列表,包含文件的元数据(包括下载链接) + :param user_repo: GitHub 仓库的 user/repo 路径 + :param skip_requirements: 是否跳过 requirements.txt 文件的下载 + :return: (是否成功, 错误信息) + """ + if not file_list: + return False, "文件列表为空" + + # 使用栈结构来替代递归调用,避免递归深度过大问题 + stack = [(pid, file_list)] + + while stack: + current_pid, current_file_list = stack.pop() + + for item in current_file_list: + # 跳过 requirements.txt 的下载 + if skip_requirements and item.get("name") == "requirements.txt": + continue + + if item.get("download_url"): + logger.debug(f"正在下载文件:{item.get('path')}") + res = await self.__async_request_with_fallback(item.get('download_url'), + headers=settings.REPO_GITHUB_HEADERS(repo=user_repo)) + if not res: + return False, f"文件 {item.get('path')} 下载失败!" + elif res.status_code != 200: + return False, f"下载文件 {item.get('path')} 失败:{res.status_code}" + + # 确保文件路径不包含版本号(如 v2、v3),如果有 package_version,移除路径中的版本号 + relative_path = item.get("path") + if package_version: + relative_path = relative_path.replace(f"plugins.{package_version}", "plugins", 1) + + # 创建插件文件夹并写入文件 + file_path = AsyncPath(settings.ROOT_PATH) / "app" / relative_path + await file_path.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.open(file_path, "w", encoding="utf-8") as f: + await f.write(res.text) + logger.debug(f"文件 {item.get('path')} 下载成功,保存路径:{file_path}") + else: + # 如果是子目录,则将子目录内容加入栈中继续处理 + sub_list, msg = await self.__async_get_file_list(f"{current_pid}/{item.get('name')}", user_repo, + package_version) + if not sub_list: + return False, msg + stack.append((f"{current_pid}/{item.get('name')}", sub_list)) + + return True, "" + + async def __async_download_and_install_requirements(self, requirements_file_info: dict, pid: str, user_repo: str) \ + -> Tuple[bool, str]: + """ + 异步下载并安装 requirements.txt 文件中的依赖 + :param requirements_file_info: requirements.txt 文件的元数据信息 + :param pid: 插件 ID + :param user_repo: GitHub 仓库的 user/repo 路径 + :return: (是否成功, 错误信息) + """ + # 下载 requirements.txt + res = await self.__async_request_with_fallback(requirements_file_info.get("download_url"), + headers=settings.REPO_GITHUB_HEADERS(repo=user_repo)) + if not res: + return False, "requirements.txt 文件下载失败" + elif res.status_code != 200: + return False, f"下载 requirements.txt 文件失败:{res.status_code}" + + requirements_txt = res.text + if requirements_txt.strip(): + # 保存并安装依赖 + requirements_file_path = AsyncPath(PLUGIN_DIR) / pid.lower() / "requirements.txt" + await requirements_file_path.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.open(requirements_file_path, "w", encoding="utf-8") as f: + await f.write(requirements_txt) + + return self.pip_install_with_fallback(Path(requirements_file_path)) + + return True, "" # 如果 requirements.txt 为空,视作成功 + + async def __async_backup_plugin(self, pid: str) -> str: + """ + 异步备份旧插件目录 + :param pid: 插件 ID + :return: 备份目录路径 + """ + plugin_dir = AsyncPath(PLUGIN_DIR) / pid + backup_dir = AsyncPath(settings.TEMP_PATH) / "plugin_backup" / pid + + if await plugin_dir.exists(): + # 备份时清理已有的备份目录,防止残留文件影响 + if await backup_dir.exists(): + await aioshutil.rmtree(backup_dir, ignore_errors=True) + logger.debug(f"{pid} 旧的备份目录已清理 {backup_dir}") + + # 异步复制目录 + await self._async_copytree(plugin_dir, backup_dir) + logger.debug(f"{pid} 插件已备份到 {backup_dir}") + + return str(backup_dir) if await backup_dir.exists() else None + + @staticmethod + async def __async_restore_plugin(pid: str, backup_dir: str): + """ + 异步还原旧插件目录 + :param pid: 插件 ID + :param backup_dir: 备份目录路径 + """ + plugin_dir = AsyncPath(PLUGIN_DIR) / pid + if await plugin_dir.exists(): + await aioshutil.rmtree(plugin_dir, ignore_errors=True) + logger.debug(f"{pid} 已清理插件目录 {plugin_dir}") + + backup_path = AsyncPath(backup_dir) + if await backup_path.exists(): + await PluginHelper._async_copytree(backup_path, plugin_dir) + logger.debug(f"{pid} 已还原插件目录 {plugin_dir}") + await aioshutil.rmtree(backup_path, ignore_errors=True) + logger.debug(f"{pid} 已删除备份目录 {backup_dir}") + + @staticmethod + async def __async_remove_old_plugin(pid: str): + """ + 异步删除旧插件 + :param pid: 插件 ID + """ + plugin_dir = AsyncPath(PLUGIN_DIR) / pid + if await plugin_dir.exists(): + await aioshutil.rmtree(plugin_dir, ignore_errors=True) + + async def _async_copytree(self, src: AsyncPath, dst: AsyncPath): + """ + 异步递归复制目录 + :param src: 源目录 + :param dst: 目标目录 + """ + if not await src.exists(): + return + + await dst.mkdir(parents=True, exist_ok=True) + + async for item in src.iterdir(): + dst_item = dst / item.name + if await item.is_dir(): + await self._async_copytree(item, dst_item) + else: + async with aiofiles.open(item, 'rb') as src_file: + content = await src_file.read() + async with aiofiles.open(dst_item, 'wb') as dst_file: + await dst_file.write(content) + + async def __async_install_dependencies_if_required(self, pid: str) -> Tuple[bool, bool, str]: + """ + 异步安装插件依赖。 + :param pid: 插件 ID + :return: (是否存在依赖,安装是否成功, 错误信息) + """ + # 定位插件目录和依赖文件 + plugin_dir = AsyncPath(PLUGIN_DIR) / pid.lower() + requirements_file = plugin_dir / "requirements.txt" + + # 检查是否存在 requirements.txt 文件 + if await requirements_file.exists(): + logger.info(f"{pid} 存在依赖,开始尝试安装依赖") + success, error_message = self.pip_install_with_fallback(Path(requirements_file)) + if success: + return True, True, "" + else: + return True, False, error_message + + return False, False, "不存在依赖" + + async def async_install_dependencies(self, dependencies: List[str]) -> Tuple[bool, str]: + """ + 异步安装指定的依赖项列表 + :param dependencies: 需要安装或更新的依赖项列表 + :return: (success, message) + """ + if not dependencies: + return False, "没有传入需要安装的依赖项" + + try: + logger.debug(f"需要安装或更新的依赖项:{dependencies}") + # 创建临时的 requirements.txt 文件用于批量安装 + requirements_temp_file = AsyncPath(settings.TEMP_PATH) / "plugin_dependencies" / "requirements.txt" + await requirements_temp_file.parent.mkdir(parents=True, exist_ok=True) + + async with aiofiles.open(requirements_temp_file, "w", encoding="utf-8") as f: + for dep in dependencies: + await f.write(dep + "\n") + + try: + # 使用自动降级策略安装依赖 + return self.pip_install_with_fallback(Path(requirements_temp_file)) + finally: + # 删除临时文件 + await requirements_temp_file.unlink() + except Exception as e: + logger.error(f"安装依赖项时发生错误:{e}") + return False, f"安装依赖项时发生错误:{e}" + + async def __async_find_plugin_dependencies(self) -> Dict[str, str]: + """ + 异步收集所有插件的依赖项 + 遍历 plugins 目录下的所有插件,查找存在 requirements.txt 的插件目录 + ,并解析其中的依赖项,同时将所有插件的依赖项合并到字典中,方便后续统一处理 + :return: 依赖项字典,格式为 {package_name: set(version_specifiers)} + """ + dependencies = {} + try: + install_plugins = { + plugin_id.lower() # 对应插件的小写目录名 + for plugin_id in SystemConfigOper().get( + SystemConfigKey.UserInstalledPlugins + ) or [] + } + + plugin_dir_path = AsyncPath(PLUGIN_DIR) + async for plugin_dir in plugin_dir_path.iterdir(): + if await plugin_dir.is_dir(): + requirements_file = plugin_dir / "requirements.txt" + if await requirements_file.exists(): + if plugin_dir.name not in install_plugins: + # 这个插件不在安装列表中 忽略它的依赖 + logger.debug(f"忽略插件 {plugin_dir.name} 的依赖") + continue + # 解析当前插件的 requirements.txt,获取依赖项 + plugin_deps = await self.__async_parse_requirements(requirements_file) + for pkg_name, version_specifiers in plugin_deps.items(): + if pkg_name in dependencies: + # 更新已存在的包的版本约束集合 + dependencies[pkg_name].update(version_specifiers) + else: + # 添加新的包及其版本约束 + dependencies[pkg_name] = set(version_specifiers) + return self.__merge_dependencies(dependencies) + except Exception as e: + logger.error(f"收集插件依赖项时发生错误:{e}") + return {} + + async def __async_parse_requirements(self, requirements_file: AsyncPath) -> Dict[str, List[str]]: + """ + 异步解析 requirements.txt 文件,返回依赖项字典 + 使用 packaging 库解析每一行依赖项,提取包名和版本约束 + 对于无法解析的行,记录警告日志,便于后续检查 + :param requirements_file: requirements.txt 文件的路径 + :return: 依赖项字典,格式为 {package_name: [version_specifier]} + """ + dependencies = {} + try: + async with aiofiles.open(requirements_file, "r", encoding="utf-8") as f: + async for line in f: + line = str(line).strip() + if line and not line.startswith('#'): + # 使用 packaging 库解析依赖项 + try: + req = Requirement(line) + pkg_name = self.__standardize_pkg_name(req.name) + version_specifier = str(req.specifier) + if pkg_name in dependencies: + dependencies[pkg_name].append(version_specifier) + else: + dependencies[pkg_name] = [version_specifier] + except Exception as e: + logger.debug(f"无法解析依赖项 '{line}':{e}") + return dependencies + except Exception as e: + logger.error(f"解析 requirements.txt 时发生错误:{e}") + return {} + + async def async_find_missing_dependencies(self) -> List[str]: + """ + 异步收集所有需要安装或更新的依赖项 + 1. 收集所有插件的依赖项,合并版本约束 + 2. 获取已安装的包及其版本 + 3. 比较已安装的包与所需的依赖项,找出需要安装或升级的包 + :return: 需要安装或更新的依赖项列表,例如 ["package1>=1.0.0", "package2"] + """ + try: + # 收集所有插件的依赖项 + plugin_dependencies = await self.__async_find_plugin_dependencies() # 返回格式为 {package_name: version_specifier} + # 获取已安装的包及其版本 + installed_packages = self.__get_installed_packages() # 返回格式为 {package_name: Version} + # 需要安装或更新的依赖项列表 + dependencies_to_install = [] + for pkg_name, version_specifier in plugin_dependencies.items(): + spec_set = SpecifierSet(version_specifier) + installed_version = installed_packages.get(pkg_name) + if installed_version is None: + # 包未安装,需要安装 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + elif not spec_set.contains(installed_version, prereleases=True): + # 已安装的版本不满足版本约束,需要升级或降级 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + # 已安装的版本满足要求,无需操作 + return dependencies_to_install + except Exception as e: + logger.error(f"收集所有需要安装或更新的依赖项时发生错误:{e}") + return [] + + async def async_install(self, pid: str, repo_url: str, package_version: Optional[str] = None, + force_install: bool = False) -> Tuple[bool, str]: + """ + 异步安装插件,包括依赖安装和文件下载,相关资源支持自动降级策略 + 1. 检查并获取插件的指定版本,确认版本兼容性 + 2. 从 GitHub 获取文件列表(包括 requirements.txt) + 3. 删除旧的插件目录(如非强制安装则进行备份) + 4. 下载并预安装 requirements.txt 中的依赖(如果存在) + 5. 下载并安装插件的其他文件 + 6. 再次尝试安装依赖(确保安装完整) + :param pid: 插件 ID + :param repo_url: 插件仓库地址 + :param package_version: 首选插件版本 (如 "v2", "v3"),如不指定则默认使用系统配置的版本 + :param force_install: 是否强制安装插件,默认不启用,启用时不进行备份和恢复操作 + :return: (是否成功, 错误信息) + """ + if SystemUtils.is_frozen(): + return False, "可执行文件模式下,只能安装本地插件" + + # 验证参数 + if not pid or not repo_url: + return False, "参数错误" + + # 从 GitHub 的 repo_url 获取用户和项目名 + user, repo = self.get_repo_info(repo_url) + if not user or not repo: + return False, "不支持的插件仓库地址格式" + + user_repo = f"{user}/{repo}" + + if not package_version: + package_version = settings.VERSION_FLAG + + # 1. 优先检查指定版本的插件 + package_version = await self.async_get_plugin_package_version(pid, repo_url, package_version) + # 如果 package_version 为None,说明没有找到匹配的插件 + if package_version is None: + msg = f"{pid} 没有找到适用于当前版本的插件" + logger.debug(msg) + return False, msg + # package_version 为空,表示从 package.json 中找到插件 + elif package_version == "": + logger.debug(f"{pid} 从 package.json 中找到适用于当前版本的插件") + else: + logger.debug(f"{pid} 从 package.{package_version}.json 中找到适用于当前版本的插件") + + # 2. 获取插件文件列表(包括 requirements.txt) + file_list, msg = await self.__async_get_file_list(pid.lower(), user_repo, package_version) + if not file_list: + return False, msg + + # 3. 删除旧的插件目录,如果不强制安装则备份 + backup_dir = None + if not force_install: + backup_dir = await self.__async_backup_plugin(pid.lower()) + + await self.__async_remove_old_plugin(pid.lower()) + + # 4. 查找并安装 requirements.txt 中的依赖,确保插件环境的依赖尽可能完整。依赖安装可能失败且不影响插件安装,目前只记录日志 + requirements_file_info = next((f for f in file_list if f.get("name") == "requirements.txt"), None) + if requirements_file_info: + logger.debug(f"{pid} 发现 requirements.txt,提前下载并预安装依赖") + success, message = await self.__async_download_and_install_requirements(requirements_file_info, + pid, user_repo) + if not success: + logger.debug(f"{pid} 依赖预安装失败:{message}") + else: + logger.debug(f"{pid} 依赖预安装成功") + + # 5. 下载插件的其他文件 + logger.info(f"{pid} 准备开始下载插件文件") + success, message = await self.__async_download_files(pid.lower(), file_list, user_repo, package_version, True) + if not success: + logger.error(f"{pid} 下载插件文件失败:{message}") + if backup_dir: + await self.__async_restore_plugin(pid.lower(), backup_dir) + logger.warning(f"{pid} 插件安装失败,已还原备份插件") + else: + await self.__async_remove_old_plugin(pid.lower()) + logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装") + + return False, message + else: + logger.info(f"{pid} 下载插件文件成功") + + # 6. 插件文件安装成功后,再次尝试安装依赖,避免因为遗漏依赖导致的插件运行问题,目前依旧只记录日志 + dependencies_exist, success, message = await self.__async_install_dependencies_if_required(pid) + if dependencies_exist: + if not success: + logger.error(f"{pid} 依赖安装失败:{message}") + if backup_dir: + await self.__async_restore_plugin(pid.lower(), backup_dir) + logger.warning(f"{pid} 插件安装失败,已还原备份插件") + else: + await self.__async_remove_old_plugin(pid.lower()) + logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装") + else: + logger.info(f"{pid} 依赖安装成功") + + # 插件安装成功后,统计安装信息 + await self.async_install_reg(pid) + return True, "" diff --git a/requirements.in b/requirements.in index caf7abda..2e9a297b 100644 --- a/requirements.in +++ b/requirements.in @@ -7,6 +7,7 @@ passlib~=1.7.4 PyJWT~=2.10.1 python-multipart~=0.0.9 aiofiles~=24.1.0 +aioshutil~=1.5 alembic~=1.16.2 bcrypt~=4.0.1 regex~=2024.11.6