diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index f280d413..3d83a4ed 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -1,14 +1,14 @@ -from typing import Any, List, Annotated, Optional +from typing import Annotated, Any, List, Optional from fastapi import APIRouter, Depends, Header from app import schemas -from app.factory import app from app.core.config import settings from app.core.plugin import PluginManager -from app.core.security import verify_token, verify_apikey +from app.core.security import verify_apikey, verify_token from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser +from app.factory import app from app.helper.plugin import PluginHelper from app.log import logger from app.scheduler import Scheduler diff --git a/app/core/plugin.py b/app/core/plugin.py index 271be6ae..ef426dec 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -3,9 +3,11 @@ import concurrent.futures import importlib.util import inspect import os +import time import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -19,7 +21,7 @@ from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper from app.helper.sites import SitesHelper from app.log import logger -from app.schemas.types import SystemConfigKey, EventType +from app.schemas.types import EventType, SystemConfigKey from app.utils.crypto import RSAUtils from app.utils.limit import rate_limit_window from app.utils.object import ObjectUtils @@ -271,34 +273,62 @@ class PluginManager(metaclass=Singleton): # 广播事件 eventmanager.send_event(EventType.PluginReload, data={"plugin_id": plugin_id}) - def sync(self): + def sync(self) -> List[str]: """ 安装本地不存在的在线插件 """ + def install_plugin(plugin): + start_time = time.time() + state, msg = self.pluginhelper.install(pid=plugin.id, repo_url=plugin.repo_url) + elapsed_time = time.time() - start_time + if state: + logger.info( + f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version},耗时:{elapsed_time:.2f} 秒") + sync_plugins.append(plugin.id) + else: + logger.error( + f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg},耗时:{elapsed_time:.2f} 秒") + failed_plugins.append(plugin.id) + if SystemUtils.is_frozen(): - return - logger.info("开始安装第三方插件...") - # 已安装插件 + return [] + + # 获取已安装插件列表 install_plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) or [] - # 在线插件 + # 获取在线插件列表 online_plugins = self.get_online_plugins() - if not online_plugins: - logger.error("未获取到第三方插件") - return - # 支持更新的插件自动更新 - for plugin in online_plugins: - # 只处理已安装的插件 - if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id): - # 下载安装 - state, msg = self.pluginhelper.install(pid=plugin.id, - repo_url=plugin.repo_url) - # 安装失败 - if not state: - logger.error( - f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg}") - continue - logger.info(f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version}") - logger.info("第三方插件安装完成") + # 确定需要安装的插件 + plugins_to_install = [ + plugin for plugin in online_plugins + if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id) + ] + + if not plugins_to_install: + return [] + logger.info("开始安装第三方插件...") + sync_plugins = [] + failed_plugins = [] + + # 使用 ThreadPoolExecutor 进行并发安装 + total_start_time = time.time() + with ThreadPoolExecutor(max_workers=5) as executor: + futures = { + executor.submit(install_plugin, plugin): plugin + for plugin in plugins_to_install + } + for future in as_completed(futures): + plugin = futures[future] + try: + future.result() + except Exception as exc: + logger.error(f"插件 {plugin.plugin_name} 安装过程中出现异常: {exc}") + + total_elapsed_time = time.time() - total_start_time + logger.info( + f"第三方插件安装完成,成功:{len(sync_plugins)} 个," + f"失败:{len(failed_plugins)} 个,总耗时:{total_elapsed_time:.2f} 秒" + ) + return sync_plugins def get_plugin_config(self, pid: str) -> dict: """ diff --git a/app/scheduler.py b/app/scheduler.py index 4f151f60..ae4c42df 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -359,9 +359,7 @@ class Scheduler(metaclass=Singleton): } ) - # 注册插件公共服务 - for pid in PluginManager().get_running_plugin_ids(): - self.update_plugin_job(pid) + self.init_plugin_jobs() # 打印服务 logger.debug(self._scheduler.print_jobs()) @@ -410,6 +408,13 @@ class Scheduler(metaclass=Singleton): except KeyError: pass + def init_plugin_jobs(self): + """ + 注册插件公共服务 + """ + for pid in PluginManager().get_running_plugin_ids(): + self.update_plugin_job(pid) + def update_plugin_job(self, pid: str): """ 更新插件定时服务 diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 7bb0e7d3..71e0011c 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -1,9 +1,11 @@ +import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI -from app.startup.module_initializer import start_modules, shutdown_modules -from app.startup.routers import init_routers +from app.startup.modules_initializer import shutdown_modules, start_modules +from app.startup.plugins_initializer import init_plugins_async +from app.startup.routers_initializer import init_routers @asynccontextmanager @@ -14,6 +16,16 @@ async def lifespan(app: FastAPI): print("Starting up...") start_modules(app) init_routers(app) - yield - print("Shutting down...") - shutdown_modules(app) + plugin_init_task = asyncio.create_task(init_plugins_async()) + try: + yield + finally: + print("Shutting down...") + try: + plugin_init_task.cancel() + await plugin_init_task + except asyncio.CancelledError: + print("Plugin installation task cancelled.") + except Exception as e: + print(f"Error during plugin installation shutdown: {e}") + shutdown_modules(app) diff --git a/app/startup/module_initializer.py b/app/startup/modules_initializer.py similarity index 98% rename from app/startup/module_initializer.py rename to app/startup/modules_initializer.py index 1bd86a89..1e06892f 100644 --- a/app/startup/module_initializer.py +++ b/app/startup/modules_initializer.py @@ -147,8 +147,6 @@ def start_modules(_: FastAPI): ModuleManager() # 启动事件消费 EventManager().start() - # 安装在线插件 - PluginManager().sync() # 加载插件 PluginManager().start() # 启动监控任务 diff --git a/app/startup/plugins_initializer.py b/app/startup/plugins_initializer.py new file mode 100644 index 00000000..c359d324 --- /dev/null +++ b/app/startup/plugins_initializer.py @@ -0,0 +1,37 @@ +import asyncio + +from app.core.plugin import PluginManager +from app.log import logger +from app.scheduler import Scheduler + + +async def init_plugins_async(): + """ + 初始化安装插件,并动态注册后台任务及API + """ + try: + loop = asyncio.get_event_loop() + plugin_manager = PluginManager() + scheduler = Scheduler() + sync_plugins = await loop.run_in_executor(None, plugin_manager.sync) + if not sync_plugins: + return + # 为避免初始化插件异常,这里所有插件都进行初始化 + logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件") + # 安装完成后重新初始化插件 + plugin_manager.init_config() + # 插件启动后注册后台任务 + scheduler.init_plugin_jobs() + # 插件启动后注册插件API + register_plugin_api() + logger.info("所有插件初始化完成") + except Exception as e: + logger.error(f"插件初始化过程中出现异常: {e}") + + +def register_plugin_api(): + """ + 插件启动后注册插件API + """ + from app.api.endpoints import plugin + plugin.register_plugin_api() diff --git a/app/startup/routers.py b/app/startup/routers_initializer.py similarity index 100% rename from app/startup/routers.py rename to app/startup/routers_initializer.py