diff --git a/app/core/event.py b/app/core/event.py index a2f93e8f..cbdb378a 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,3 +1,4 @@ +import inspect import threading import time import uuid @@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者 - self.__chain_subscribers: Dict[ - ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器) + self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者 self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 @@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton): threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, - priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: + priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: """ 发送事件,根据事件类型决定是广播事件还是链式事件 :param etype: 事件类型 (EventType 或 ChainEventType) @@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton): else: logger.error(f"Unknown event type: {etype}") - def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None], + def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable, priority: int = DEFAULT_EVENT_PRIORITY): """ 注册事件处理器,将处理器添加到对应的事件订阅列表中 @@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton): self.__broadcast_subscribers[event_type].append(handler) logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") - def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]): + def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): """ 移除事件处理器,将处理器从对应事件的订阅列表中删除 :param event_type: 事件类型 (EventType 或 ChainEventType) @@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton): self.__broadcast_subscribers[event_type].remove(handler) logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") - def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): + def disable_event_handler(self, target: Union[Callable, type]): """ - 禁用指定名称的事件处理器或事件处理类,防止其响应事件 - :param handler_name: 要禁用的事件处理器名称 - :param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器 + 禁用指定的事件处理器或事件处理器类 + :param target: 处理器函数或类 """ - if class_name: - self.__disabled_classes.add(class_name) - logger.debug(f"Disabled event handler class: {class_name}") + identifier = self.__get_handler_identifier(target) + if isinstance(target, type): + self.__disabled_classes.add(identifier) + logger.debug(f"Disabled event handler class: {identifier}") else: - self.__disabled_handlers.add(handler_name) - logger.debug(f"Disabled event handler: {handler_name}") + self.__disabled_handlers.add(identifier) + logger.debug(f"Disabled event handler: {identifier}") - def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None): + def enable_event_handler(self, target: Union[Callable, type]): """ - 启用指定名称的事件处理器或事件处理类,使其可以继续响应事件 - :param handler_name: 要启用的事件处理器名称 - :param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器 + 启用指定的事件处理器或事件处理器类 + :param target: 处理器函数或类 """ - if class_name: - self.__disabled_classes.discard(class_name) - logger.debug(f"Enabled event handler class: {class_name}") + identifier = self.__get_handler_identifier(target) + if isinstance(target, type): + self.__disabled_classes.discard(identifier) + logger.debug(f"Enabled event handler class: {identifier}") else: - self.__disabled_handlers.discard(handler_name) - logger.debug(f"Enabled event handler: {handler_name}") + self.__disabled_handlers.discard(identifier) + logger.debug(f"Enabled event handler: {identifier}") - def check(self, etype: Union[EventType, ChainEventType]) -> bool: - """ - 检查是否有启用的事件处理器可以响应某个事件类型 - :param etype: 事件类型 (EventType 或 ChainEventType) - :return: 返回是否存在可用的处理器 - """ - if isinstance(etype, ChainEventType): - handlers = self.__chain_subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for _, handler in handlers - ) - else: - handlers = self.__broadcast_subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for handler in handlers - ) - - def visualize_handlers(self) -> List[Dict[str, str]]: + def visualize_handlers(self) -> List[Dict]: """ 可视化所有事件处理器,包括是否被禁用的状态 - :return: 处理器列表,包含处理器名称、类名和状态 + :return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态 """ handler_info = [] - with self.__lock: - for event_type, handlers in self.__broadcast_subscribers.items(): - for handler in handlers: - class_name = handler.__qualname__.split(".")[0] - status = ( - "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" - ) - handler_info.append({ - "event_type": event_type.value, - "handler_name": handler.__name__, - "class_name": class_name, - "status": status - }) - for event_type, handlers in self.__chain_subscribers.items(): - for priority, handler in handlers: - class_name = handler.__qualname__.split(".")[0] - status = ( - "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" - ) - handler_info.append({ - "event_type": event_type.value, - "handler_name": handler.__name__, - "class_name": class_name, - "priority": priority, - "status": status - }) + # 统一处理广播事件和链式事件 + for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items(): + for handler_data in subscribers: + if isinstance(subscribers, dict): + priority, handler = handler_data + else: + priority = None + handler = handler_data + # 获取处理器的唯一标识符 + handler_id = self.__get_handler_identifier(handler) + # 检查处理器的启用状态 + status = "enabled" if self.__is_handler_enabled(handler) else "disabled" + # 构建处理器信息字典 + handler_dict = { + "event_type": event_type.value, + "handler_identifier": handler_id, + "status": status + } + if priority is not None: + handler_dict["priority"] = priority + handler_info.append(handler_dict) return handler_info - def __trigger_chain_event(self, event: Event) -> Dict: + @staticmethod + def __get_handler_identifier(target: Union[Callable, type]) -> str: """ - 触发链式事件,按顺序调用订阅的处理器 - :param event: 要处理的事件对象 - :return: 返回处理后的事件数据 + 获取处理器或处理器类的唯一标识符,包括模块名和类名 + :param target: 处理器函数或类 + :return: 唯一标识符 + """ + if isinstance(target, type): + # 如果是类,使用模块名和类名 + module_name = target.__module__ + class_name = target.__qualname__ + return f"{module_name}.{class_name}" + else: + # 如果是函数或方法,使用 inspect.getmodule 来获取模块名 + module = inspect.getmodule(target) + module_name = module.__name__ if module else "unknown_module" + qualname = target.__qualname__ + return f"{module_name}.{qualname}" + + def __is_handler_enabled(self, handler: Callable) -> bool: + """ + 检查处理器是否已启用(没有被禁用) + :param handler: 处理器函数 + :return: 如果处理器启用则返回 True,否则返回 False + """ + # 获取处理器的唯一标识符 + handler_id = self.__get_handler_identifier(handler) + + # 获取处理器所属类的唯一标识符 + class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None + + # 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False + if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes): + return False + + return True + + def __trigger_chain_event(self, event: Event) -> Event: + """ + 触发链式事件,按顺序调用订阅的处理器,并记录处理耗时 """ logger.debug(f"Triggering synchronous chain event: {event}") self.__dispatch_chain_event(event) - return event.event_data + return event def __trigger_broadcast_event(self, event: Event): """ @@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton): def __dispatch_chain_event(self, event: Event): """ - 同步方式调度链式事件,按优先级顺序逐个调用事件处理器 + 同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间 :param event: 要调度的事件对象 """ handlers = self.__chain_subscribers.get(event.event_type, []) self.__log_event_lifecycle(event, "started") for priority, handler in handlers: - class_name = handler.__qualname__.split(".")[0] - if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: - start_time = time.time() - try: - handler(event.event_data) - logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) " - f"completed in {time.time() - start_time:.3f}s") - except Exception as e: - self.__handle_event_error(event, handler, e) + start_time = time.time() + self.__safe_invoke_handler(handler, event) + logger.debug( + f"Handler {handler.__qualname__} (Priority: {priority}) " + f"completed in {time.time() - start_time:.3f}s") + self.__log_event_lifecycle(event, "completed") def __dispatch_broadcast_event(self, event: Event): @@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton): """ handlers = self.__broadcast_subscribers.get(event.event_type, []) for handler in handlers: - class_name = handler.__qualname__.split(".")[0] - if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: - self.__executor.submit(self.__safe_invoke_handler, handler, event) + self.__executor.submit(self.__safe_invoke_handler, handler, event) - def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event): + def __safe_invoke_handler(self, handler: Callable, event: Event): """ 安全调用事件处理器,捕获异常并记录日志 :param handler: 要调用的处理器 :param event: 事件对象 """ + if not self.__is_handler_enabled(handler): + logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.") + return try: - handler(event.event_data) + handler(event) except Exception as e: self.__handle_event_error(event, handler, e) @@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton): - 或事件类型成员的列表 """ - def decorator(f: Callable[[Dict], None]): + def decorator(f: Callable): event_list = [] # 如果传入的是列表,处理每个事件类型 diff --git a/app/core/plugin.py b/app/core/plugin.py index 315510fa..ff365006 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -7,7 +7,6 @@ import traceback from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type -from app.helper.sites import SitesHelper from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper from app.db.systemconfig_oper import SystemConfigOper from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper +from app.helper.sites import SitesHelper from app.log import logger from app.schemas.types import SystemConfigKey from app.utils.crypto import RSAUtils @@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton): # 未安装的不加载 if plugin_id not in installed_plugins: # 设置事件状态为不可用 - eventmanager.disable_event_handler(class_name=plugin_id) + eventmanager.disable_event_handler(plugin) continue # 生成实例 plugin_obj = plugin() @@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton): logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}") # 启用的插件才设置事件注册状态可用 if plugin_obj.get_state(): - eventmanager.enable_event_handler(class_name=plugin_id) + eventmanager.enable_event_handler(plugin) else: - eventmanager.disable_event_handler(class_name=plugin_id) + eventmanager.disable_event_handler(plugin) except Exception as err: logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}") @@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton): :param plugin_id: 插件ID :param conf: 插件配置 """ - if not self._running_plugins.get(plugin_id): + plugin = self._running_plugins.get(plugin_id) + if not plugin: return - self._running_plugins[plugin_id].init_plugin(conf) - if self._running_plugins[plugin_id].get_state(): - # 设置启用的插件事件注册状态可用 - eventmanager.enable_event_handler(class_name=plugin_id) + # 初始化插件 + plugin.init_plugin(conf) + # 检查插件状态并启用/禁用事件处理器 + if plugin.get_state(): + # 启用插件类的事件处理器 + eventmanager.enable_event_handler(type(plugin)) else: - # 设置事件状态为不可用 - eventmanager.disable_event_handler(class_name=plugin_id) + # 禁用插件类的事件处理器 + eventmanager.disable_event_handler(type(plugin)) def stop(self, pid: str = None): """