feat(event): optimize handler

This commit is contained in:
InfinityPacer
2024-09-20 16:26:45 +08:00
parent 3bee5a8a86
commit be63e9ed15
2 changed files with 115 additions and 106 deletions

View File

@@ -1,3 +1,4 @@
import inspect
import threading import threading
import time import time
import uuid import uuid
@@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton):
""" """
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_queue = PriorityQueue() # 优先级队列 self.__event_queue = PriorityQueue() # 优先级队列
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者 self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁 self.__lock = threading.Lock() # 线程锁
@@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton):
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
""" """
发送事件,根据事件类型决定是广播事件还是链式事件 发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType) :param etype: 事件类型 (EventType 或 ChainEventType)
@@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton):
else: else:
logger.error(f"Unknown event type: {etype}") logger.error(f"Unknown event type: {etype}")
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None], def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
priority: int = DEFAULT_EVENT_PRIORITY): priority: int = DEFAULT_EVENT_PRIORITY):
""" """
注册事件处理器,将处理器添加到对应的事件订阅列表中 注册事件处理器,将处理器添加到对应的事件订阅列表中
@@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].append(handler) self.__broadcast_subscribers[event_type].append(handler)
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]): def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
""" """
移除事件处理器,将处理器从对应事件的订阅列表中删除 移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 ChainEventType) :param event_type: 事件类型 (EventType 或 ChainEventType)
@@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].remove(handler) self.__broadcast_subscribers[event_type].remove(handler)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): def disable_event_handler(self, target: Union[Callable, type]):
""" """
禁用指定名称的事件处理器或事件处理类,防止其响应事件 禁用指定的事件处理器或事件处理
:param handler_name: 要禁用的事件处理器名称 :param target: 处理器函数或类
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
""" """
if class_name: identifier = self.__get_handler_identifier(target)
self.__disabled_classes.add(class_name) if isinstance(target, type):
logger.debug(f"Disabled event handler class: {class_name}") self.__disabled_classes.add(identifier)
logger.debug(f"Disabled event handler class: {identifier}")
else: else:
self.__disabled_handlers.add(handler_name) self.__disabled_handlers.add(identifier)
logger.debug(f"Disabled event handler: {handler_name}") logger.debug(f"Disabled event handler: {identifier}")
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None): def enable_event_handler(self, target: Union[Callable, type]):
""" """
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件 启用指定的事件处理器或事件处理
:param handler_name: 要启用的事件处理器名称 :param target: 处理器函数或类
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
""" """
if class_name: identifier = self.__get_handler_identifier(target)
self.__disabled_classes.discard(class_name) if isinstance(target, type):
logger.debug(f"Enabled event handler class: {class_name}") self.__disabled_classes.discard(identifier)
logger.debug(f"Enabled event handler class: {identifier}")
else: else:
self.__disabled_handlers.discard(handler_name) self.__disabled_handlers.discard(identifier)
logger.debug(f"Enabled event handler: {handler_name}") logger.debug(f"Enabled event handler: {identifier}")
def check(self, etype: Union[EventType, ChainEventType]) -> bool: def visualize_handlers(self) -> List[Dict]:
"""
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for _, handler in handlers
)
else:
handlers = self.__broadcast_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for handler in handlers
)
def visualize_handlers(self) -> List[Dict[str, str]]:
""" """
可视化所有事件处理器,包括是否被禁用的状态 可视化所有事件处理器,包括是否被禁用的状态
:return: 处理器列表,包含处理器名称、类名和状态 :return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
""" """
handler_info = [] handler_info = []
with self.__lock: # 统一处理广播事件和链式事件
for event_type, handlers in self.__broadcast_subscribers.items(): for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
for handler in handlers: for handler_data in subscribers:
class_name = handler.__qualname__.split(".")[0] if isinstance(subscribers, dict):
status = ( priority, handler = handler_data
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" else:
) priority = None
handler_info.append({ handler = handler_data
"event_type": event_type.value, # 获取处理器的唯一标识符
"handler_name": handler.__name__, handler_id = self.__get_handler_identifier(handler)
"class_name": class_name, # 检查处理器的启用状态
"status": status status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
}) # 构建处理器信息字典
for event_type, handlers in self.__chain_subscribers.items(): handler_dict = {
for priority, handler in handlers: "event_type": event_type.value,
class_name = handler.__qualname__.split(".")[0] "handler_identifier": handler_id,
status = ( "status": status
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" }
) if priority is not None:
handler_info.append({ handler_dict["priority"] = priority
"event_type": event_type.value, handler_info.append(handler_dict)
"handler_name": handler.__name__,
"class_name": class_name,
"priority": priority,
"status": status
})
return handler_info return handler_info
def __trigger_chain_event(self, event: Event) -> Dict: @staticmethod
def __get_handler_identifier(target: Union[Callable, type]) -> str:
""" """
触发链式事件,按顺序调用订阅的处理器 获取处理器或处理器类的唯一标识符,包括模块名和类名
:param event: 处理的事件对象 :param target: 处理器函数或类
:return: 返回处理后的事件数据 :return: 唯一标识符
"""
if isinstance(target, type):
# 如果是类,使用模块名和类名
module_name = target.__module__
class_name = target.__qualname__
return f"{module_name}.{class_name}"
else:
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
module = inspect.getmodule(target)
module_name = module.__name__ if module else "unknown_module"
qualname = target.__qualname__
return f"{module_name}.{qualname}"
def __is_handler_enabled(self, handler: Callable) -> bool:
"""
检查处理器是否已启用(没有被禁用)
:param handler: 处理器函数
:return: 如果处理器启用则返回 True否则返回 False
"""
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 获取处理器所属类的唯一标识符
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
return False
return True
def __trigger_chain_event(self, event: Event) -> Event:
"""
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
""" """
logger.debug(f"Triggering synchronous chain event: {event}") logger.debug(f"Triggering synchronous chain event: {event}")
self.__dispatch_chain_event(event) self.__dispatch_chain_event(event)
return event.event_data return event
def __trigger_broadcast_event(self, event: Event): def __trigger_broadcast_event(self, event: Event):
""" """
@@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton):
def __dispatch_chain_event(self, event: Event): def __dispatch_chain_event(self, event: Event):
""" """
同步方式调度链式事件,按优先级顺序逐个调用事件处理器 同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象 :param event: 要调度的事件对象
""" """
handlers = self.__chain_subscribers.get(event.event_type, []) handlers = self.__chain_subscribers.get(event.event_type, [])
self.__log_event_lifecycle(event, "started") self.__log_event_lifecycle(event, "started")
for priority, handler in handlers: for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0] start_time = time.time()
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: self.__safe_invoke_handler(handler, event)
start_time = time.time() logger.debug(
try: f"Handler {handler.__qualname__} (Priority: {priority}) "
handler(event.event_data) f"completed in {time.time() - start_time:.3f}s")
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
except Exception as e:
self.__handle_event_error(event, handler, e)
self.__log_event_lifecycle(event, "completed") self.__log_event_lifecycle(event, "completed")
def __dispatch_broadcast_event(self, event: Event): def __dispatch_broadcast_event(self, event: Event):
@@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton):
""" """
handlers = self.__broadcast_subscribers.get(event.event_type, []) handlers = self.__broadcast_subscribers.get(event.event_type, [])
for handler in handlers: for handler in handlers:
class_name = handler.__qualname__.split(".")[0] self.__executor.submit(self.__safe_invoke_handler, handler, event)
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
self.__executor.submit(self.__safe_invoke_handler, handler, event)
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event): def __safe_invoke_handler(self, handler: Callable, event: Event):
""" """
安全调用事件处理器,捕获异常并记录日志 安全调用事件处理器,捕获异常并记录日志
:param handler: 要调用的处理器 :param handler: 要调用的处理器
:param event: 事件对象 :param event: 事件对象
""" """
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
return
try: try:
handler(event.event_data) handler(event)
except Exception as e: except Exception as e:
self.__handle_event_error(event, handler, e) self.__handle_event_error(event, handler, e)
@@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton):
- 或事件类型成员的列表 - 或事件类型成员的列表
""" """
def decorator(f: Callable[[Dict], None]): def decorator(f: Callable):
event_list = [] event_list = []
# 如果传入的是列表,处理每个事件类型 # 如果传入的是列表,处理每个事件类型

View File

@@ -7,7 +7,6 @@ import traceback
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
from app.helper.sites import SitesHelper
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
@@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper from app.db.systemconfig_oper import SystemConfigOper
from app.helper.module import ModuleHelper from app.helper.module import ModuleHelper
from app.helper.plugin import PluginHelper from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger from app.log import logger
from app.schemas.types import SystemConfigKey from app.schemas.types import SystemConfigKey
from app.utils.crypto import RSAUtils from app.utils.crypto import RSAUtils
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
# 未安装的不加载 # 未安装的不加载
if plugin_id not in installed_plugins: if plugin_id not in installed_plugins:
# 设置事件状态为不可用 # 设置事件状态为不可用
eventmanager.disable_event_handler(class_name=plugin_id) eventmanager.disable_event_handler(plugin)
continue continue
# 生成实例 # 生成实例
plugin_obj = plugin() plugin_obj = plugin()
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}") logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
# 启用的插件才设置事件注册状态可用 # 启用的插件才设置事件注册状态可用
if plugin_obj.get_state(): if plugin_obj.get_state():
eventmanager.enable_event_handler(class_name=plugin_id) eventmanager.enable_event_handler(plugin)
else: else:
eventmanager.disable_event_handler(class_name=plugin_id) eventmanager.disable_event_handler(plugin)
except Exception as err: except Exception as err:
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}") logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
@@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton):
:param plugin_id: 插件ID :param plugin_id: 插件ID
:param conf: 插件配置 :param conf: 插件配置
""" """
if not self._running_plugins.get(plugin_id): plugin = self._running_plugins.get(plugin_id)
if not plugin:
return return
self._running_plugins[plugin_id].init_plugin(conf) # 初始化插件
if self._running_plugins[plugin_id].get_state(): plugin.init_plugin(conf)
# 设置启用的插件事件注册状态可用 # 检查插件状态并启用/禁用事件处理器
eventmanager.enable_event_handler(class_name=plugin_id) if plugin.get_state():
# 启用插件类的事件处理器
eventmanager.enable_event_handler(type(plugin))
else: else:
# 设置事件状态为不可用 # 禁用插件类的事件处理器
eventmanager.disable_event_handler(class_name=plugin_id) eventmanager.disable_event_handler(type(plugin))
def stop(self, pid: str = None): def stop(self, pid: str = None):
""" """