feat(plugin): add state check for commands, APIs, and services

This commit is contained in:
InfinityPacer
2024-10-22 01:36:48 +08:00
parent 0145421885
commit 6b875ef2de
3 changed files with 85 additions and 45 deletions

View File

@@ -31,7 +31,7 @@ def register_plugin_api(plugin_id: Optional[str] = None):
def remove_plugin_api(plugin_id: str):
"""
动态移除插件 API
动态移除单个插件 API
:param plugin_id: 插件 ID
"""
_update_plugin_api_routes(plugin_id, action="remove")
@@ -40,25 +40,29 @@ def remove_plugin_api(plugin_id: str):
def _update_plugin_api_routes(plugin_id: Optional[str], action: str):
"""
插件 API 路由注册和移除
:param plugin_id: 插件 ID如果为 None则处理所有插件
:param action: 'add' 'remove'决定是添加还是移除路由
:param plugin_id: 插件 ID如果 action 为 "add" 且 plugin_id 为 None则处理所有插件
如果 action 为 "remove"plugin_id 必须是有效的插件 ID
:param action: "add""remove",决定是添加还是移除路由
"""
if action not in {"add", "remove"}:
raise ValueError("Action must be 'add' or 'remove'")
is_modified = False
existing_paths = {route.path: route for route in app.routes}
plugin_apis = PluginManager().get_plugin_apis(plugin_id)
for api in plugin_apis:
api_path = f"{PLUGIN_PREFIX}{api.get('path', '')}"
try:
existing_route = existing_paths.get(api_path)
if existing_route:
app.routes.remove(existing_route)
is_modified = True
plugin_ids = [plugin_id] if plugin_id else PluginManager().get_running_plugin_ids()
for plugin_id in plugin_ids:
routes_removed = _remove_routes(plugin_id)
if routes_removed:
is_modified = True
if action == "add":
if action != "add":
continue
# 获取插件的 API 路由信息
plugin_apis = PluginManager().get_plugin_apis(plugin_id)
for api in plugin_apis:
api_path = f"{PLUGIN_PREFIX}{api.get('path', '')}"
try:
api["path"] = api_path
allow_anonymous = api.pop("allow_anonymous", False)
dependencies = api.setdefault("dependencies", [])
@@ -66,9 +70,9 @@ def _update_plugin_api_routes(plugin_id: Optional[str], action: str):
dependencies.append(Depends(verify_apikey))
app.add_api_route(**api, tags=["plugin"])
is_modified = True
except Exception as e:
logger.error(f"Error {action}ing route {api_path}: {str(e)}")
logger.debug(f"Added plugin route: {api_path}")
except Exception as e:
logger.error(f"Error adding plugin route {api_path}: {str(e)}")
if is_modified:
_clean_protected_routes(existing_paths)
@@ -76,6 +80,27 @@ def _update_plugin_api_routes(plugin_id: Optional[str], action: str):
app.setup()
def _remove_routes(plugin_id: str) -> bool:
"""
移除与单个插件相关的路由
:param plugin_id: 插件 ID
:return: 是否有路由被移除
"""
if not plugin_id:
return False
prefix = f"{PLUGIN_PREFIX}/{plugin_id}/"
routes_to_remove = [route for route in app.routes if route.path.startswith(prefix)]
removed = False
for route in routes_to_remove:
try:
app.routes.remove(route)
removed = True
logger.debug(f"Removed plugin route: {route.path}")
except Exception as e:
logger.error(f"Error removing plugin route {route.path}: {str(e)}")
return removed
def _clean_protected_routes(existing_paths: dict):
"""
清理受保护的路由,防止在插件操作中被删除或重复添加

View File

@@ -435,27 +435,42 @@ class PluginManager(metaclass=Singleton):
)
return None
def get_plugin_commands(self) -> List[Dict[str, Any]]:
def get_plugin_state(self, pid: str) -> bool:
"""
获取插件状态
:param pid: 插件ID
"""
plugin = self._running_plugins.get(pid)
return plugin.get_state() if plugin else False
def get_plugin_commands(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件命令
[{
"cmd": "/xx",
"event": EventType.xx,
"desc": "xxxx",
"data": {}
"data": {},
"pid": "",
}]
"""
ret_commands = []
for _, plugin in self._running_plugins.items():
if hasattr(plugin, "get_command") \
and ObjectUtils.check_method(plugin.get_command):
for plugin_id, plugin in self._running_plugins.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_command") and ObjectUtils.check_method(plugin.get_command):
try:
ret_commands += plugin.get_command() or []
if not plugin.get_state():
continue
commands = plugin.get_command() or []
for command in commands:
command["pid"] = plugin_id
ret_commands.extend(commands)
except Exception as e:
logger.error(f"获取插件命令出错:{str(e)}")
return ret_commands
def get_plugin_apis(self, plugin_id: str = None) -> List[Dict[str, Any]]:
def get_plugin_apis(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件API
[{
@@ -468,21 +483,22 @@ class PluginManager(metaclass=Singleton):
}]
"""
ret_apis = []
for pid, plugin in self._running_plugins.items():
if plugin_id and pid != plugin_id:
for plugin_id, plugin in self._running_plugins.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_api") \
and ObjectUtils.check_method(plugin.get_api):
if hasattr(plugin, "get_api") and ObjectUtils.check_method(plugin.get_api):
try:
if not plugin.get_state():
continue
apis = plugin.get_api() or []
for api in apis:
api["path"] = f"/{pid}{api['path']}"
api["path"] = f"/{plugin_id}{api['path']}"
ret_apis.extend(apis)
except Exception as e:
logger.error(f"获取插件 {pid} API出错{str(e)}")
logger.error(f"获取插件 {plugin_id} API出错{str(e)}")
return ret_apis
def get_plugin_services(self) -> List[Dict[str, Any]]:
def get_plugin_services(self, pid: Optional[str] = None) -> List[Dict[str, Any]]:
"""
获取插件服务
[{
@@ -490,19 +506,21 @@ class PluginManager(metaclass=Singleton):
"name": "服务名称",
"trigger": "触发器cron、interval、date、CronTrigger.from_crontab()",
"func": self.xxx,
"kwagrs": {} # 定时器参数
"kwargs": {} # 定时器参数
}]
"""
ret_services = []
for pid, plugin in self._running_plugins.items():
if hasattr(plugin, "get_service") \
and ObjectUtils.check_method(plugin.get_service):
for plugin_id, plugin in self._running_plugins.items():
if pid and pid != plugin_id:
continue
if hasattr(plugin, "get_service") and ObjectUtils.check_method(plugin.get_service):
try:
services = plugin.get_service()
if services:
ret_services.extend(services)
if not plugin.get_state():
continue
services = plugin.get_service() or []
ret_services.extend(services)
except Exception as e:
logger.error(f"获取插件 {pid} 服务出错:{str(e)}")
logger.error(f"获取插件 {plugin_id} 服务出错:{str(e)}")
return ret_services
def get_plugin_dashboard_meta(self):

View File

@@ -95,8 +95,7 @@ class Scheduler(metaclass=Singleton):
)
)
PluginManager().init_config()
for plugin_id in PluginManager().get_running_plugin_ids():
self.update_plugin_job(plugin_id)
self.init_plugin_jobs()
else:
self._auth_count += 1
@@ -410,7 +409,7 @@ class Scheduler(metaclass=Singleton):
def init_plugin_jobs(self):
"""
注册插件公共服务
初始化插件定时服务
"""
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
@@ -419,14 +418,14 @@ class Scheduler(metaclass=Singleton):
"""
更新插件定时服务
"""
if not self._scheduler:
if not self._scheduler or not pid:
return
# 移除该插件的全部服务
self.remove_plugin_job(pid)
# 获取插件服务列表
with self._lock:
try:
plugin_services = PluginManager().run_plugin_method(pid, "get_service") or []
plugin_services = PluginManager().get_plugin_services(pid=pid)
except Exception as e:
logger.error(f"运行插件 {pid} 服务失败:{str(e)} - {traceback.format_exc()}")
return
@@ -451,9 +450,7 @@ class Scheduler(metaclass=Singleton):
id=sid,
name=service["name"],
**service["kwargs"],
kwargs={
'job_id': job_id
}
kwargs={"job_id": job_id}
)
logger.info(f"注册插件{plugin_name}服务:{service['name']} - {service['trigger']}")
except Exception as e: