From 86dec5aec24b75c4244c28f00a9ffb3a94cedd1b Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Thu, 24 Oct 2024 02:55:54 +0800 Subject: [PATCH] feat(setup): complete missing dependencies installation --- app/core/plugin.py | 22 ++++ app/helper/plugin.py | 200 ++++++++++++++++++++++++++++- app/startup/plugins_initializer.py | 26 +++- 3 files changed, 239 insertions(+), 9 deletions(-) diff --git a/app/core/plugin.py b/app/core/plugin.py index 068cdd0d..b34aad9d 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -331,6 +331,28 @@ class PluginManager(metaclass=Singleton): ) return sync_plugins + def install_plugin_missing_dependencies(self) -> List[str]: + """ + 安装插件中缺失或不兼容的依赖项 + + :return: (success, message) + """ + # return [] + # 第一步:获取需要安装的依赖项列表 + missing_dependencies = self.pluginhelper.find_missing_dependencies() + if not missing_dependencies: + return missing_dependencies + logger.info(f"开始安装缺失的依赖项,共 {len(missing_dependencies)} 个...") + # 第二步:安装依赖项并返回结果 + total_start_time = time.time() + success, message = self.pluginhelper.install_dependencies(missing_dependencies) + total_elapsed_time = time.time() - total_start_time + if success: + logger.info(f"已完成 {len(missing_dependencies)} 个依赖项安装,总耗时:{total_elapsed_time:.2f} 秒") + else: + logger.warning(f"存在缺失依赖项安装失败,请尝试手动安装,总耗时:{total_elapsed_time:.2f} 秒") + return missing_dependencies + def get_plugin_config(self, pid: str) -> dict: """ 获取插件配置 diff --git a/app/helper/plugin.py b/app/helper/plugin.py index a9a2e7c5..0d92173c 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -2,9 +2,13 @@ import json import shutil import traceback from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Set +import pkg_resources from cachetools import TTLCache, cached +from packaging.specifiers import SpecifierSet, InvalidSpecifier +from packaging.version import Version, InvalidVersion +from pkg_resources import Requirement from app.core.config import settings from app.db.systemconfig_oper import SystemConfigOper @@ -15,6 +19,8 @@ from app.utils.singleton import Singleton from app.utils.system import SystemUtils from app.utils.url import UrlUtils +PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins" + class PluginHelper(metaclass=Singleton): """ @@ -359,7 +365,7 @@ class PluginHelper(metaclass=Singleton): requirements_txt = res.text if requirements_txt.strip(): # 保存并安装依赖 - requirements_file_path = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() / "requirements.txt" + requirements_file_path = PLUGIN_DIR / pid.lower() / "requirements.txt" requirements_file_path.parent.mkdir(parents=True, exist_ok=True) with open(requirements_file_path, "w", encoding="utf-8") as f: f.write(requirements_txt) @@ -376,7 +382,7 @@ class PluginHelper(metaclass=Singleton): :return: (是否存在依赖,安装是否成功, 错误信息) """ # 定位插件目录和依赖文件 - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() + plugin_dir = PLUGIN_DIR / pid.lower() requirements_file = plugin_dir / "requirements.txt" # 检查是否存在 requirements.txt 文件 @@ -397,7 +403,7 @@ class PluginHelper(metaclass=Singleton): :param pid: 插件 ID :return: 备份目录路径 """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid backup_dir = Path(settings.TEMP_PATH) / "plugins_backup" / pid if plugin_dir.exists(): @@ -418,7 +424,7 @@ class PluginHelper(metaclass=Singleton): :param pid: 插件 ID :param backup_dir: 备份目录路径 """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid if plugin_dir.exists(): shutil.rmtree(plugin_dir, ignore_errors=True) logger.debug(f"{pid} 已清理插件目录 {plugin_dir}") @@ -435,7 +441,7 @@ class PluginHelper(metaclass=Singleton): 删除旧插件 :param pid: 插件 ID """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid if plugin_dir.exists(): shutil.rmtree(plugin_dir, ignore_errors=True) @@ -560,3 +566,185 @@ class PluginHelper(metaclass=Singleton): logger.error(f"[GitHub] 所有策略均请求失败,URL: {url},请检查网络连接或 GitHub 配置") return None + + def find_missing_dependencies(self) -> List[str]: + """ + 收集所有需要安装或更新的依赖项 + 1. 收集所有插件的依赖项,合并版本约束 + 2. 获取已安装的包及其版本 + 3. 比较已安装的包与所需的依赖项,找出需要安装或升级的包 + :return: 需要安装或更新的依赖项列表,例如 ["package1>=1.0.0", "package2"] + """ + try: + # 收集所有插件的依赖项 + missing_dependencies = self.__find_plugin_dependencies() # 返回格式为 {package_name: version_specifier} + # 获取已安装的包及其版本 + installed_packages = self.__get_installed_packages() # 返回格式为 {package_name: Version} + # 需要安装或更新的依赖项列表 + dependencies_to_install = [] + for pkg_name, version_specifier in missing_dependencies.items(): + spec_set = SpecifierSet(version_specifier) + installed_version = installed_packages.get(pkg_name) + if installed_version is None: + # 包未安装,需要安装 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + elif not spec_set.contains(installed_version, prereleases=True): + # 已安装的版本不满足版本约束,需要升级或降级 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + # 已安装的版本满足要求,无需操作 + return dependencies_to_install + except Exception as e: + logger.error(f"收集所有需要安装或更新的依赖项时发生错误:{e}") + return [] + + def install_dependencies(self, dependencies: List[str]) -> Tuple[bool, str]: + """ + 安装指定的依赖项列表 + + :param dependencies: 需要安装或更新的依赖项列表 + :return: (success, message) + """ + if not dependencies: + return False, "没有传入需要安装的依赖项" + + try: + logger.debug(f"需要安装或更新的依赖项:{dependencies}") + # 创建临时的 requirements.txt 文件用于批量安装 + requirements_temp_file = Path(settings.TEMP_PATH) / "plugin_dependencies" / "requirements.txt" + requirements_temp_file.parent.mkdir(parents=True, exist_ok=True) + with open(requirements_temp_file, "w", encoding="utf-8") as f: + for dep in dependencies: + f.write(dep + '\n') + + # 使用自动降级策略安装依赖 + success, message = self.__pip_install_with_fallback(requirements_temp_file) + # 删除临时文件 + requirements_temp_file.unlink() + return success, message + except Exception as e: + logger.error(f"安装依赖项时发生错误:{e}") + return False, f"安装依赖项时发生错误:{e}" + + def __get_installed_packages(self) -> Dict[str, Version]: + """ + 获取已安装的包及其版本 + 使用 pkg_resources 获取当前环境中已安装的包,标准化包名并转换版本信息 + 对于无法解析的版本,记录警告日志并跳过 + :return: 已安装包的字典,格式为 {package_name: Version} + """ + installed_packages = {} + try: + for dist in pkg_resources.working_set: + pkg_name = self.__standardize_pkg_name(dist.project_name) + try: + installed_packages[pkg_name] = Version(dist.version) + except InvalidVersion: + logger.debug(f"无法解析已安装包 '{pkg_name}' 的版本:{dist.version}") + continue + return installed_packages + except Exception as e: + logger.error(f"获取已安装的包时发生错误:{e}") + return {} + + def __find_plugin_dependencies(self) -> Dict[str, str]: + """ + 收集所有插件的依赖项 + 遍历 plugins 目录下的所有插件,查找存在 requirements.txt 的插件目录 + ,并解析其中的依赖项,同时将所有插件的依赖项合并到字典中,方便后续统一处理 + :return: 依赖项字典,格式为 {package_name: set(version_specifiers)} + """ + dependencies = {} + try: + for plugin_dir in PLUGIN_DIR.iterdir(): + if plugin_dir.is_dir(): + requirements_file = plugin_dir / "requirements.txt" + if requirements_file.exists(): + # 解析当前插件的 requirements.txt,获取依赖项 + plugin_deps = self.__parse_requirements(requirements_file) + for pkg_name, version_specifiers in plugin_deps.items(): + logger.debug(f"当前处理的包:{pkg_name}, 版本约束:{version_specifiers}") + if pkg_name in dependencies: + # 更新已存在的包的版本约束集合 + dependencies[pkg_name].update(version_specifiers) + else: + # 添加新的包及其版本约束 + dependencies[pkg_name] = set(version_specifiers) + return self.__merge_dependencies(dependencies) + except Exception as e: + logger.error(f"收集插件依赖项时发生错误:{e}") + return {} + + def __parse_requirements(self, requirements_file: Path) -> Dict[str, List[str]]: + """ + 解析 requirements.txt 文件,返回依赖项字典 + 使用 packaging 库解析每一行依赖项,提取包名和版本约束 + 对于无法解析的行,记录警告日志,便于后续检查 + :param requirements_file: requirements.txt 文件的路径 + :return: 依赖项字典,格式为 {package_name: [version_specifier]} + """ + dependencies = {} + try: + with open(requirements_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + # 使用 packaging 库解析依赖项 + try: + req = Requirement(line) + pkg_name = self.__standardize_pkg_name(req.name) + version_specifier = str(req.specifier) + logger.debug(f"解析到依赖项:包名={pkg_name}, 版本约束={version_specifier}") + if pkg_name in dependencies: + dependencies[pkg_name].append(version_specifier) + else: + dependencies[pkg_name] = [version_specifier] + except Exception as e: + logger.debug(f"无法解析依赖项 '{line}':{e}") + return dependencies + except Exception as e: + logger.error(f"解析 requirements.txt 时发生错误:{e}") + return {} + + @staticmethod + def __merge_dependencies(dependencies: Dict[str, Set[str]]) -> Dict[str, str]: + """ + 合并依赖项,选择每个包的最高版本要求 + 对于多个插件依赖同一包的情况,合并其版本约束,取交集以满足所有插件的要求 + 如果交集为空,表示存在版本冲突,需要根据策略进行处理 + :param dependencies: 依赖项字典,格式为 {package_name: set(version_specifiers)} + :return: 合并后的依赖项字典,格式为 {package_name: version_specifiers} + """ + try: + merged_dependencies = {} + for pkg_name, version_specifiers in dependencies.items(): + logger.debug(f"合并包:{pkg_name} 的版本约束:{version_specifiers}") + # 合并版本约束 + spec_set = SpecifierSet() + for specifier in version_specifiers: + try: + if specifier: + spec_set &= SpecifierSet(specifier) + except InvalidSpecifier as e: + logger.error(f"发生版本约束冲突:{e}") + # 将合并后的版本约束添加到结果字典 + merged_dependencies[pkg_name] = str(spec_set) if spec_set else '' + return merged_dependencies + except Exception as e: + logger.error(f"合并依赖项时发生错误:{e}") + return {} + + @staticmethod + def __standardize_pkg_name(name: str) -> str: + """ + 标准化包名,将包名转换为小写并将连字符替换为下划线 + + :param name: 原始包名 + :return: 标准化后的包名 + """ + return name.lower().replace("-", "_") if name else name diff --git a/app/startup/plugins_initializer.py b/app/startup/plugins_initializer.py index 23494732..e1b270ee 100644 --- a/app/startup/plugins_initializer.py +++ b/app/startup/plugins_initializer.py @@ -15,11 +15,18 @@ async def init_plugins_async(): plugin_manager = PluginManager() scheduler = Scheduler() command = CommandChain() - sync_plugins = await loop.run_in_executor(None, plugin_manager.sync) - if not sync_plugins: + + sync_result = await execute_task(loop, plugin_manager.sync, "插件同步到本地") + resolved_dependencies = await execute_task(loop, plugin_manager.install_plugin_missing_dependencies, + "缺失依赖项安装") + # 判断是否需要进行插件初始化 + if not sync_result and not resolved_dependencies: + logger.debug("没有新的插件同步到本地或缺失依赖项需要安装,跳过插件初始化") return + + # 继续执行后续的插件初始化步骤 + logger.info("正在初始化所有插件") # 为避免初始化插件异常,这里所有插件都进行初始化 - logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件") # 安装完成后重新初始化插件 plugin_manager.init_config() # 插件启动后注册后台任务 @@ -33,6 +40,19 @@ async def init_plugins_async(): logger.error(f"插件初始化过程中出现异常: {e}") +async def execute_task(loop, task_func, task_name): + try: + result = await loop.run_in_executor(None, task_func) + if isinstance(result, list) and result: + logger.info(f"{task_name} 已完成,共处理 {len(result)} 个项目") + else: + logger.debug(f"没有新的 {task_name} 需要处理") + return result + except Exception as e: + logger.error(f"{task_name} 时发生错误:{e}", exc_info=True) + return [] + + def register_plugin_api(): """ 插件启动后注册插件API