feat(event): optimize handler

This commit is contained in:
InfinityPacer
2024-09-20 16:26:45 +08:00
parent 3bee5a8a86
commit be63e9ed15
2 changed files with 115 additions and 106 deletions

View File

@@ -1,3 +1,4 @@
import inspect
import threading
import time
import uuid
@@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton):
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
@@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton):
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
"""
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType)
@@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton):
else:
logger.error(f"Unknown event type: {etype}")
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None],
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
priority: int = DEFAULT_EVENT_PRIORITY):
"""
注册事件处理器,将处理器添加到对应的事件订阅列表中
@@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].append(handler)
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]):
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
"""
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 ChainEventType)
@@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].remove(handler)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
def disable_event_handler(self, target: Union[Callable, type]):
"""
禁用指定名称的事件处理器或事件处理类,防止其响应事件
:param handler_name: 要禁用的事件处理器名称
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
禁用指定的事件处理器或事件处理
:param target: 处理器函数或类
"""
if class_name:
self.__disabled_classes.add(class_name)
logger.debug(f"Disabled event handler class: {class_name}")
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.add(identifier)
logger.debug(f"Disabled event handler class: {identifier}")
else:
self.__disabled_handlers.add(handler_name)
logger.debug(f"Disabled event handler: {handler_name}")
self.__disabled_handlers.add(identifier)
logger.debug(f"Disabled event handler: {identifier}")
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
def enable_event_handler(self, target: Union[Callable, type]):
"""
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件
:param handler_name: 要启用的事件处理器名称
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
启用指定的事件处理器或事件处理
:param target: 处理器函数或类
"""
if class_name:
self.__disabled_classes.discard(class_name)
logger.debug(f"Enabled event handler class: {class_name}")
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.discard(identifier)
logger.debug(f"Enabled event handler class: {identifier}")
else:
self.__disabled_handlers.discard(handler_name)
logger.debug(f"Enabled event handler: {handler_name}")
self.__disabled_handlers.discard(identifier)
logger.debug(f"Enabled event handler: {identifier}")
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
"""
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for _, handler in handlers
)
else:
handlers = self.__broadcast_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for handler in handlers
)
def visualize_handlers(self) -> List[Dict[str, str]]:
def visualize_handlers(self) -> List[Dict]:
"""
可视化所有事件处理器,包括是否被禁用的状态
:return: 处理器列表,包含处理器名称、类名和状态
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
"""
handler_info = []
with self.__lock:
for event_type, handlers in self.__broadcast_subscribers.items():
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
"handler_name": handler.__name__,
"class_name": class_name,
"status": status
})
for event_type, handlers in self.__chain_subscribers.items():
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
"handler_name": handler.__name__,
"class_name": class_name,
"priority": priority,
"status": status
})
# 统一处理广播事件和链式事件
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
for handler_data in subscribers:
if isinstance(subscribers, dict):
priority, handler = handler_data
else:
priority = None
handler = handler_data
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 检查处理器的启用状态
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
# 构建处理器信息字典
handler_dict = {
"event_type": event_type.value,
"handler_identifier": handler_id,
"status": status
}
if priority is not None:
handler_dict["priority"] = priority
handler_info.append(handler_dict)
return handler_info
def __trigger_chain_event(self, event: Event) -> Dict:
@staticmethod
def __get_handler_identifier(target: Union[Callable, type]) -> str:
"""
触发链式事件,按顺序调用订阅的处理器
:param event: 处理的事件对象
:return: 返回处理后的事件数据
获取处理器或处理器类的唯一标识符,包括模块名和类名
:param target: 处理器函数或类
:return: 唯一标识符
"""
if isinstance(target, type):
# 如果是类,使用模块名和类名
module_name = target.__module__
class_name = target.__qualname__
return f"{module_name}.{class_name}"
else:
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
module = inspect.getmodule(target)
module_name = module.__name__ if module else "unknown_module"
qualname = target.__qualname__
return f"{module_name}.{qualname}"
def __is_handler_enabled(self, handler: Callable) -> bool:
"""
检查处理器是否已启用(没有被禁用)
:param handler: 处理器函数
:return: 如果处理器启用则返回 True否则返回 False
"""
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 获取处理器所属类的唯一标识符
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
return False
return True
def __trigger_chain_event(self, event: Event) -> Event:
"""
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
"""
logger.debug(f"Triggering synchronous chain event: {event}")
self.__dispatch_chain_event(event)
return event.event_data
return event
def __trigger_broadcast_event(self, event: Event):
"""
@@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton):
def __dispatch_chain_event(self, event: Event):
"""
同步方式调度链式事件,按优先级顺序逐个调用事件处理器
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, [])
self.__log_event_lifecycle(event, "started")
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
start_time = time.time()
try:
handler(event.event_data)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
except Exception as e:
self.__handle_event_error(event, handler, e)
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
self.__log_event_lifecycle(event, "completed")
def __dispatch_broadcast_event(self, event: Event):
@@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton):
"""
handlers = self.__broadcast_subscribers.get(event.event_type, [])
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
self.__executor.submit(self.__safe_invoke_handler, handler, event)
self.__executor.submit(self.__safe_invoke_handler, handler, event)
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event):
def __safe_invoke_handler(self, handler: Callable, event: Event):
"""
安全调用事件处理器,捕获异常并记录日志
:param handler: 要调用的处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
return
try:
handler(event.event_data)
handler(event)
except Exception as e:
self.__handle_event_error(event, handler, e)
@@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton):
- 或事件类型成员的列表
"""
def decorator(f: Callable[[Dict], None]):
def decorator(f: Callable):
event_list = []
# 如果传入的是列表,处理每个事件类型

View File

@@ -7,7 +7,6 @@ import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
from app.helper.sites import SitesHelper
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
@@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.module import ModuleHelper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.crypto import RSAUtils
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
# 未安装的不加载
if plugin_id not in installed_plugins:
# 设置事件状态为不可用
eventmanager.disable_event_handler(class_name=plugin_id)
eventmanager.disable_event_handler(plugin)
continue
# 生成实例
plugin_obj = plugin()
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
# 启用的插件才设置事件注册状态可用
if plugin_obj.get_state():
eventmanager.enable_event_handler(class_name=plugin_id)
eventmanager.enable_event_handler(plugin)
else:
eventmanager.disable_event_handler(class_name=plugin_id)
eventmanager.disable_event_handler(plugin)
except Exception as err:
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
@@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton):
:param plugin_id: 插件ID
:param conf: 插件配置
"""
if not self._running_plugins.get(plugin_id):
plugin = self._running_plugins.get(plugin_id)
if not plugin:
return
self._running_plugins[plugin_id].init_plugin(conf)
if self._running_plugins[plugin_id].get_state():
# 设置启用的插件事件注册状态可用
eventmanager.enable_event_handler(class_name=plugin_id)
# 初始化插件
plugin.init_plugin(conf)
# 检查插件状态并启用/禁用事件处理器
if plugin.get_state():
# 启用插件类的事件处理器
eventmanager.enable_event_handler(type(plugin))
else:
# 设置事件状态为不可用
eventmanager.disable_event_handler(class_name=plugin_id)
# 禁用插件类的事件处理器
eventmanager.disable_event_handler(type(plugin))
def stop(self, pid: str = None):
"""