diff --git a/app/core/plugin.py b/app/core/plugin.py index 37c8bda5..7a55d7c8 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -1,3 +1,4 @@ +import ast import asyncio import concurrent import concurrent.futures @@ -9,12 +10,12 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +import threading from typing import Any, Dict, List, Optional, Type, Union, Callable, Tuple from fastapi import HTTPException from starlette import status -from watchdog.events import FileSystemEventHandler -from watchdog.observers import Observer +from watchfiles import watch from app import schemas from app.core.cache import cached @@ -27,93 +28,12 @@ from app.helper.sites import SitesHelper # noqa from app.log import logger from app.schemas.types import EventType, SystemConfigKey from app.utils.crypto import RSAUtils -from app.utils.debounce import debounce from app.utils.object import ObjectUtils from app.utils.singleton import Singleton from app.utils.string import StringUtils from app.utils.system import SystemUtils -class PluginMonitorHandler(FileSystemEventHandler): - - def on_modified(self, event): - """ - 插件文件修改后重载 - """ - if event.is_directory: - return - # 使用 pathlib 处理文件路径,跳过非 .py 文件以及 pycache 目录中的文件 - event_path = Path(event.src_path) - if not event_path.name.endswith(".py") or "pycache" in event_path.parts: - return - - # 防抖模式下处理文件修改事件 - self._handle_modification(event_path) - - @debounce(interval=1.0, leading=False, source="PluginMonitorHandler", enable_logging=False) - def _handle_modification(self, event_path: Path): - """ - 处理文件修改事件 - :param event_path: - :return: - """ - logger.debug(f"防抖计时结束,开始处理文件修改事件: {event_path}") - # 解析插件ID - pid = self._get_plugin_id_from_path(event_path) - if not pid: - logger.debug(f"文件不属于任何有效插件,已忽略: {event_path}") - return - - # 触发重载 - self.__reload_plugin(pid) - - @staticmethod - def _get_plugin_id_from_path(event_path: Path) -> Optional[str]: - """ - 根据文件路径解析出插件的ID。 - :param event_path: 被修改文件的 Path 对象。 - :return: 插件ID字符串,如果不是有效插件文件则返回 None。 - """ - try: - plugins_root = settings.ROOT_PATH / "app" / "plugins" - # 确保修改的文件在 plugins 目录下 - if plugins_root not in event_path.parents: - return None - - # 找到插件的根目录 - plugin_dir = event_path.parent - while plugin_dir.parent != plugins_root: - plugin_dir = plugin_dir.parent - if plugin_dir == plugins_root: # 防止无限循环 - break - - init_file = plugin_dir / "__init__.py" - if not init_file.exists(): - return None - - # 读取 __init__.py 文件,查找插件主类名 - with open(init_file, "r", encoding="utf-8") as f: - for line in f: - if line.startswith("class") and "(_PluginBase)" in line: - # 解析出类名作为插件ID - return line.split("class ")[1].split("(_PluginBase)")[0].strip() - return None - except Exception as e: - logger.error(f"从路径解析插件ID时出错: {e}") - return None - - @staticmethod - def __reload_plugin(pid): - """ - 重新加载插件 - """ - try: - logger.info(f"插件 {pid} 文件修改,重新加载...") - PluginManager().reload_plugin(pid) - except Exception as e: - logger.error(f"插件文件修改后重载出错:{str(e)}") - - class PluginManager(metaclass=Singleton): """ 插件管理器 @@ -126,8 +46,10 @@ class PluginManager(metaclass=Singleton): self._running_plugins: dict = {} # 配置Key self._config_key: str = "plugin.%s" - # 监听器 - self._observer: Observer = None + # 监控线程 + self._monitor_thread: Optional[threading.Thread] = None + # 监控停止事件 + self._stop_monitor_event = threading.Event() # 开发者模式监测插件修改 if settings.DEV or settings.PLUGIN_AUTO_RELOAD: self.__start_monitor() @@ -347,10 +269,9 @@ class PluginManager(metaclass=Singleton): 重新加载插件文件修改监测 """ if settings.DEV or settings.PLUGIN_AUTO_RELOAD: - if self._observer and self._observer.is_alive(): - logger.info("插件文件修改监测已经在运行中...") - else: - self.__start_monitor() + # 先关闭已有监测,再重新启动 + self.stop_monitor() + self.__start_monitor() else: self.stop_monitor() @@ -358,25 +279,124 @@ class PluginManager(metaclass=Singleton): """ 启用监测插件文件修改监测 """ + if self._monitor_thread and self._monitor_thread.is_alive(): + logger.info("插件文件修改监测已经在运行中...") + return + logger.info("开始监测插件文件修改...") - monitor_handler = PluginMonitorHandler() - self._observer = Observer() - self._observer.schedule(monitor_handler, str(settings.ROOT_PATH / "app" / "plugins"), recursive=True) - self._observer.start() + + # 在启动新线程之前,确保停止事件是清除状态 + self._stop_monitor_event.clear() + + # 创建并启动监控线程 + self._monitor_thread = threading.Thread( + target=self._run_file_watcher, + daemon=True + ) + self._monitor_thread.start() def stop_monitor(self): """ 停止监测插件文件修改监测 """ - # 停止监测 - if self._observer and self._observer.is_alive(): + if self._monitor_thread and self._monitor_thread.is_alive(): logger.info("正在停止插件文件修改监测...") - self._observer.stop() - self._observer.join() + self._stop_monitor_event.set() + self._monitor_thread.join(timeout=5) + if self._monitor_thread.is_alive(): + logger.warning("插件文件修改监测线程在5秒内未能正常停止。") + self._monitor_thread = None logger.info("插件文件修改监测停止完成") else: logger.info("未启用插件文件修改监测,无需停止") + def _run_file_watcher(self): + """ + 运行 watchfiles 监视器的主循环。 + """ + # 监视插件目录 + plugins_path = str(settings.ROOT_PATH / "app" / "plugins") + logger.info(">>> 监控线程已启动,准备进入watch循环...") + # 使用 watchfiles 监视目录变化,并响应变化事件 + # Todo: yield_on_timeout = True 时,每秒检查停止事件,会返回空集合;后续可以考虑用来做心跳之类的功能? + for changes in watch(plugins_path, stop_event=self._stop_monitor_event, rust_timeout=1000, + yield_on_timeout=True): + # 如果收到停止事件,退出循环 + if not changes: + continue + + # 处理变化事件 + plugins_to_reload = set() + for _change_type, path_str in changes: + event_path = Path(path_str) + + # 跳过非 .py 文件以及 pycache 目录中的文件 + if not event_path.name.endswith(".py") or "__pycache__" in event_path.parts: + continue + + # 解析插件ID + pid = self._get_plugin_id_from_path(event_path) + # 跳过无效插件文件 + if pid: + # 收集需要重载的插件ID,自动去重,避免重复重载 + plugins_to_reload.add(pid) + + # 触发重载 + if plugins_to_reload: + logger.info(f"检测到插件文件变化,准备重载: {list(plugins_to_reload)}") + for pid in plugins_to_reload: + try: + self.reload_plugin(pid) + except Exception as e: + logger.error(f"插件 {pid} 热重载失败: {e}", exc_info=True) + + @staticmethod + def _get_plugin_id_from_path(event_path: Path) -> Optional[str]: + """ + 根据文件路径解析出插件的ID。 + :param event_path: 被修改文件的 Path 对象。 + :return: 插件ID字符串,如果不是有效插件文件则返回 None。 + """ + try: + plugins_root = settings.ROOT_PATH / "app" / "plugins" + # 确保修改的文件在 plugins 目录下 + if plugins_root not in event_path.parents: + return None + + # 找到插件的根目录 + plugin_dir = event_path.parent + while plugin_dir.parent != plugins_root: + plugin_dir = plugin_dir.parent + if plugin_dir == plugins_root: # 防止无限循环 + break + + init_file = plugin_dir / "__init__.py" + if not init_file.exists(): + return None + + # 读取 __init__.py 文件,查找插件主类名 + with open(init_file, "r", encoding="utf-8") as f: + source_code = f.read() + + tree = ast.parse(source_code) + + # 遍历AST,查找继承自 _PluginBase 的类 + for node in ast.walk(tree): + # 检查节点是否为类定义 + if isinstance(node, ast.ClassDef): + # 遍历该类的所有基类 + for base in node.bases: + # 检查基类是否是我们寻找的 _PluginBase + # ast.Name 用于处理简单的基类名 + if isinstance(base, ast.Name) and base.id == '_PluginBase': + # 返回这个类的名字 + return node.name + + return None + except Exception as e: + logger.error(f"从路径解析插件ID时出错: {e}") + return None + @staticmethod def __stop_plugin(plugin: Any): """ diff --git a/requirements.in b/requirements.in index d2775425..bc8ee163 100644 --- a/requirements.in +++ b/requirements.in @@ -48,6 +48,7 @@ psutil~=7.0.0 python-dotenv~=1.1.1 python-hosts~=1.1.2 watchdog~=6.0.0 +watchfiles~=1.1.0 cacheout~=0.16.0 click~=8.2.1 requests-cache~=1.2.1