feat(setup): support asynchronous install plugins on startup

This commit is contained in:
InfinityPacer
2024-10-17 09:37:51 +08:00
parent 4469a1b3b8
commit bcc48e885a
7 changed files with 119 additions and 37 deletions

View File

@@ -1,14 +1,14 @@
from typing import Any, List, Annotated, Optional
from typing import Annotated, Any, List, Optional
from fastapi import APIRouter, Depends, Header
from app import schemas
from app.factory import app
from app.core.config import settings
from app.core.plugin import PluginManager
from app.core.security import verify_token, verify_apikey
from app.core.security import verify_apikey, verify_token
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.factory import app
from app.helper.plugin import PluginHelper
from app.log import logger
from app.scheduler import Scheduler

View File

@@ -3,9 +3,11 @@ import concurrent.futures
import importlib.util
import inspect
import os
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -19,7 +21,7 @@ from app.helper.module import ModuleHelper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import SystemConfigKey, EventType
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.limit import rate_limit_window
from app.utils.object import ObjectUtils
@@ -271,34 +273,62 @@ class PluginManager(metaclass=Singleton):
# 广播事件
eventmanager.send_event(EventType.PluginReload, data={"plugin_id": plugin_id})
def sync(self):
def sync(self) -> List[str]:
"""
安装本地不存在的在线插件
"""
def install_plugin(plugin):
start_time = time.time()
state, msg = self.pluginhelper.install(pid=plugin.id, repo_url=plugin.repo_url)
elapsed_time = time.time() - start_time
if state:
logger.info(
f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version},耗时:{elapsed_time:.2f}")
sync_plugins.append(plugin.id)
else:
logger.error(
f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg},耗时:{elapsed_time:.2f}")
failed_plugins.append(plugin.id)
if SystemUtils.is_frozen():
return
logger.info("开始安装第三方插件...")
# 已安装插件
return []
# 获取已安装插件列表
install_plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) or []
# 在线插件
# 获取在线插件列表
online_plugins = self.get_online_plugins()
if not online_plugins:
logger.error("未获取到第三方插件")
return
# 支持更新的插件自动更新
for plugin in online_plugins:
# 只处理已安装的插件
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id):
# 下载安装
state, msg = self.pluginhelper.install(pid=plugin.id,
repo_url=plugin.repo_url)
# 安装失败
if not state:
logger.error(
f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg}")
continue
logger.info(f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version}")
logger.info("第三方插件安装完成")
# 确定需要安装的插件
plugins_to_install = [
plugin for plugin in online_plugins
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id)
]
if not plugins_to_install:
return []
logger.info("开始安装第三方插件...")
sync_plugins = []
failed_plugins = []
# 使用 ThreadPoolExecutor 进行并发安装
total_start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(install_plugin, plugin): plugin
for plugin in plugins_to_install
}
for future in as_completed(futures):
plugin = futures[future]
try:
future.result()
except Exception as exc:
logger.error(f"插件 {plugin.plugin_name} 安装过程中出现异常: {exc}")
total_elapsed_time = time.time() - total_start_time
logger.info(
f"第三方插件安装完成,成功:{len(sync_plugins)} 个,"
f"失败:{len(failed_plugins)} 个,总耗时:{total_elapsed_time:.2f}"
)
return sync_plugins
def get_plugin_config(self, pid: str) -> dict:
"""

View File

@@ -359,9 +359,7 @@ class Scheduler(metaclass=Singleton):
}
)
# 注册插件公共服务
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
self.init_plugin_jobs()
# 打印服务
logger.debug(self._scheduler.print_jobs())
@@ -410,6 +408,13 @@ class Scheduler(metaclass=Singleton):
except KeyError:
pass
def init_plugin_jobs(self):
"""
注册插件公共服务
"""
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
def update_plugin_job(self, pid: str):
"""
更新插件定时服务

View File

@@ -1,9 +1,11 @@
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.startup.module_initializer import start_modules, shutdown_modules
from app.startup.routers import init_routers
from app.startup.modules_initializer import shutdown_modules, start_modules
from app.startup.plugins_initializer import init_plugins_async
from app.startup.routers_initializer import init_routers
@asynccontextmanager
@@ -14,6 +16,16 @@ async def lifespan(app: FastAPI):
print("Starting up...")
start_modules(app)
init_routers(app)
yield
print("Shutting down...")
shutdown_modules(app)
plugin_init_task = asyncio.create_task(init_plugins_async())
try:
yield
finally:
print("Shutting down...")
try:
plugin_init_task.cancel()
await plugin_init_task
except asyncio.CancelledError:
print("Plugin installation task cancelled.")
except Exception as e:
print(f"Error during plugin installation shutdown: {e}")
shutdown_modules(app)

View File

@@ -147,8 +147,6 @@ def start_modules(_: FastAPI):
ModuleManager()
# 启动事件消费
EventManager().start()
# 安装在线插件
PluginManager().sync()
# 加载插件
PluginManager().start()
# 启动监控任务

View File

@@ -0,0 +1,37 @@
import asyncio
from app.core.plugin import PluginManager
from app.log import logger
from app.scheduler import Scheduler
async def init_plugins_async():
"""
初始化安装插件并动态注册后台任务及API
"""
try:
loop = asyncio.get_event_loop()
plugin_manager = PluginManager()
scheduler = Scheduler()
sync_plugins = await loop.run_in_executor(None, plugin_manager.sync)
if not sync_plugins:
return
# 为避免初始化插件异常,这里所有插件都进行初始化
logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件")
# 安装完成后重新初始化插件
plugin_manager.init_config()
# 插件启动后注册后台任务
scheduler.init_plugin_jobs()
# 插件启动后注册插件API
register_plugin_api()
logger.info("所有插件初始化完成")
except Exception as e:
logger.error(f"插件初始化过程中出现异常: {e}")
def register_plugin_api():
"""
插件启动后注册插件API
"""
from app.api.endpoints import plugin
plugin.register_plugin_api()