From dea8fc5486d9ff741fe017967fad6f74bf296ba1 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:45:52 +0800 Subject: [PATCH] feat(event): optimized event execution flow --- app/core/event.py | 68 ++++++++++++++++++++++++++++------------------ app/core/plugin.py | 1 + app/main.py | 4 +-- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index e0c8cc97..c498f82c 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -43,6 +43,13 @@ class Event: event_kind = Event.get_event_kind(self.event_type) return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>" + def __lt__(self, other): + """ + 定义事件对象的比较规则,基于优先级比较 + 优先级小的事件会被认为“更小”,优先级高的事件将被认为“更大” + """ + return self.priority < other.priority + @staticmethod def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str: """ @@ -59,10 +66,10 @@ class EventManager(metaclass=Singleton): """ # 退出事件 - _event = threading.Event() + __event = threading.Event() def __init__(self): - self.messagehelper = MessageHelper() + self.__messagehelper = MessageHelper() self.__executor = ThreadHelper() # 动态线程池,用于消费事件 self.__consumer_threads = [] # 用于保存启动的事件消费者线程 self.__event_queue = PriorityQueue() # 优先级队列 @@ -78,7 +85,7 @@ class EventManager(metaclass=Singleton): 开始广播事件处理线程 """ # 启动消费者线程用于处理广播事件 - self._event.set() + self.__event.set() for _ in range(MIN_EVENT_CONSUMER_THREADS): thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True) thread.start() @@ -89,7 +96,7 @@ class EventManager(metaclass=Singleton): 停止广播事件处理线程 """ logger.info("正在停止事件处理...") - self._event.clear() # 停止广播事件处理 + self.__event.clear() # 停止广播事件处理 try: # 通过遍历保存的线程来等待它们完成 for consumer_thread in self.__consumer_threads: @@ -132,19 +139,28 @@ class EventManager(metaclass=Singleton): # 链式事件,按优先级排序 if event_type not in self.__chain_subscribers: self.__chain_subscribers[event_type] = {} - self.__chain_subscribers[event_type][handler_identifier] = (priority, handler) + handlers = self.__chain_subscribers[event_type] + if handler_identifier in handlers: + handlers.pop(handler_identifier) + else: + logger.debug( + f"Subscribed to chain event: {event_type.value}, " + f"Priority: {priority} - {handler_identifier}") + handlers[handler_identifier] = (priority, handler) # 根据优先级排序 self.__chain_subscribers[event_type] = dict( sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0]) ) - logger.debug( - f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") else: # 广播事件 if event_type not in self.__broadcast_subscribers: self.__broadcast_subscribers[event_type] = {} - self.__broadcast_subscribers[event_type][handler_identifier] = handler - logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") + handlers = self.__broadcast_subscribers[event_type] + if handler_identifier in handlers: + handlers.pop(handler_identifier) + else: + logger.debug(f"Subscribed to broadcast event: {event_type.value} - {handler_identifier}") + handlers[handler_identifier] = handler def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): """ @@ -157,10 +173,10 @@ class EventManager(metaclass=Singleton): if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: self.__chain_subscribers[event_type].pop(handler_identifier, None) - logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") + logger.debug(f"Unsubscribed from chain event: {event_type.value} - {handler_identifier}") elif event_type in self.__broadcast_subscribers: self.__broadcast_subscribers[event_type].pop(handler_identifier, None) - logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") + logger.debug(f"Unsubscribed from broadcast event: {event_type.value} - {handler_identifier}") def disable_event_handler(self, target: Union[Callable, type]): """ @@ -168,12 +184,14 @@ class EventManager(metaclass=Singleton): :param target: 处理器函数或类 """ identifier = self.__get_handler_identifier(target) + if identifier in self.__disabled_handlers or identifier in self.__disabled_classes: + return if isinstance(target, type): self.__disabled_classes.add(identifier) - logger.debug(f"Disabled event handler class: {identifier}") + logger.debug(f"Disabled event handler class - {identifier}") else: self.__disabled_handlers.add(identifier) - logger.debug(f"Disabled event handler: {identifier}") + logger.debug(f"Disabled event handler - {identifier}") def enable_event_handler(self, target: Union[Callable, type]): """ @@ -183,10 +201,10 @@ class EventManager(metaclass=Singleton): identifier = self.__get_handler_identifier(target) if isinstance(target, type): self.__disabled_classes.discard(identifier) - logger.debug(f"Enabled event handler class: {identifier}") + logger.debug(f"Enabled event handler class - {identifier}") else: self.__disabled_handlers.discard(identifier) - logger.debug(f"Enabled event handler: {identifier}") + logger.debug(f"Enabled event handler - {identifier}") def visualize_handlers(self) -> List[Dict]: """ @@ -283,7 +301,8 @@ class EventManager(metaclass=Singleton): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s" + f"Handler {self.__get_handler_identifier(handler)} (Priority: {priority}) ," + f" completed in {time.time() - start_time:.3f}s" ) self.__log_event_lifecycle(event, "completed") @@ -305,7 +324,7 @@ class EventManager(metaclass=Singleton): :param event: 事件对象 """ if not self.__is_handler_enabled(handler): - logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution") + logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution") return # 根据事件类型判断是否需要深复制 @@ -365,16 +384,13 @@ class EventManager(metaclass=Singleton): """ 固定的后台广播消费者线程,持续从队列中提取事件 """ - while not self._event.is_set(): + while self.__event.is_set(): # 使用 Condition 优化队列的等待机制,避免频繁触发超时 with self.__condition: - # 当队列为空时,线程进入等待状态,直到有新事件到来 - while self.__event_queue.empty(): - # 阻塞等待,直到有事件插入 - self.__condition.wait() + # 阻塞等待,直到有事件插入 + self.__condition.wait() try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) - logger.debug(f"Fixed consumer processing event: {event}") self.__dispatch_broadcast_event(event) except Empty: logger.debug("Queue is empty, waiting for new events") @@ -395,9 +411,9 @@ class EventManager(metaclass=Singleton): names = handler.__qualname__.split(".") class_name, method_name = names[0], names[1] - self.messagehelper.put(title=f"{event.event_type} 事件处理出错", - message=f"{class_name}.{method_name}:{str(e)}", - role="system") + self.__messagehelper.put(title=f"{event.event_type} 事件处理出错", + message=f"{class_name}.{method_name}:{str(e)}", + role="system") self.send_event( EventType.SystemError, { diff --git a/app/core/plugin.py b/app/core/plugin.py index ff365006..dc03ff99 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -205,6 +205,7 @@ class PluginManager(metaclass=Singleton): for plugin_id, plugin in self._running_plugins.items(): if pid and plugin_id != pid: continue + eventmanager.disable_event_handler(type(plugin)) self.__stop_plugin(plugin) # 清空对像 if pid: diff --git a/app/main.py b/app/main.py index 668dbea6..803c6094 100644 --- a/app/main.py +++ b/app/main.py @@ -238,6 +238,8 @@ def start_module(): ResourceHelper() # 加载模块 ModuleManager() + # 启动事件消费 + EventManager().start() # 安装在线插件 PluginManager().sync() # 加载插件 @@ -248,8 +250,6 @@ def start_module(): Scheduler() # 加载命令 Command() - # 启动事件消费 - EventManager().start() # 初始化路由 init_routers() # 启动前端服务