feat:支持vue原生插件页面

This commit is contained in:
jxxghp
2025-05-03 10:03:44 +08:00
parent 491009636a
commit c692a3c80e
4 changed files with 207 additions and 88 deletions

View File

@@ -1,13 +1,12 @@
import concurrent
import concurrent.futures
import importlib.util
import inspect
import os
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Type, Union
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -220,6 +219,14 @@ class PluginManager(metaclass=Singleton):
self._running_plugins = {}
logger.info("插件停止完成")
@property
def running_plugins(self):
"""
获取运行态插件列表
:return: 运行态插件列表
"""
return self._running_plugins
def reload_monitor(self):
"""
重新加载插件文件修改监测
@@ -407,68 +414,6 @@ class PluginManager(metaclass=Singleton):
self.plugindata.del_data(pid)
return True
def get_plugin_form(self, pid: str) -> Tuple[List[dict], Dict[str, Any]]:
"""
获取插件表单
:param pid: 插件ID
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return [], {}
if hasattr(plugin, "get_form"):
return plugin.get_form() or ([], {})
return [], {}
def get_plugin_page(self, pid: str) -> List[dict]:
"""
获取插件页面
:param pid: 插件ID
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return []
if hasattr(plugin, "get_page"):
return plugin.get_page() or []
return []
def get_plugin_dashboard(self, pid: str, key: Optional[str] = None, **kwargs) -> Optional[schemas.PluginDashboard]:
"""
获取插件仪表盘
:param pid: 插件ID
:param key: 仪表盘key
"""
def __get_params_count(func: Callable):
"""
获取函数的参数信息
"""
signature = inspect.signature(func)
return len(signature.parameters)
plugin = self._running_plugins.get(pid)
if not plugin:
return None
if hasattr(plugin, "get_dashboard"):
# 检查方法的参数个数
params_count = __get_params_count(plugin.get_dashboard)
if params_count > 1:
dashboard: Tuple = plugin.get_dashboard(key=key, **kwargs)
elif params_count > 0:
dashboard: Tuple = plugin.get_dashboard(**kwargs)
else:
dashboard: Tuple = plugin.get_dashboard()
if dashboard:
cols, attrs, elements = dashboard
return schemas.PluginDashboard(
id=pid,
name=plugin.plugin_name,
key=key or "",
cols=cols or {},
elements=elements,
attrs=attrs or {}
)
return None
def get_plugin_state(self, pid: str) -> bool:
"""
获取插件状态