From 857383c8d031993b183c6606064e78e7ccfde614 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 20:37:29 +0800 Subject: [PATCH] feat(event): improve event consumer logic for handling of events --- app/command.py | 92 +------------------------------ app/core/event.py | 138 ++++++++++++++++++++++++++++++++++++++-------- app/main.py | 7 ++- 3 files changed, 121 insertions(+), 116 deletions(-) diff --git a/app/command.py b/app/command.py index 6426046d..ab7d272a 100644 --- a/app/command.py +++ b/app/command.py @@ -1,8 +1,4 @@ -import copy -import importlib -import threading import traceback -from threading import Thread from typing import Any, Union, Dict from app.chain import ChainBase @@ -12,11 +8,9 @@ from app.chain.subscribe import SubscribeChain from app.chain.system import SystemChain from app.chain.transfer import TransferChain from app.core.config import settings -from app.core.event import Event as ManagerEvent -from app.core.event import eventmanager, EventManager +from app.core.event import Event as ManagerEvent, eventmanager from app.core.plugin import PluginManager from app.helper.message import MessageHelper -from app.helper.thread import ThreadHelper from app.log import logger from app.scheduler import Scheduler from app.schemas import Notification @@ -41,12 +35,7 @@ class Command(metaclass=Singleton): # 内建命令 _commands = {} - # 退出事件 - _event = threading.Event() - def __init__(self): - # 事件管理器 - self.eventmanager = EventManager() # 插件管理器 self.pluginmanager = PluginManager() # 处理链 @@ -55,8 +44,6 @@ class Command(metaclass=Singleton): self.scheduler = Scheduler() # 消息管理器 self.messagehelper = MessageHelper() - # 线程管理器 - self.threader = ThreadHelper() # 内置命令 self._commands = { "/cookiecloud": { @@ -172,72 +159,9 @@ class Command(metaclass=Singleton): # 广播注册命令菜单 if not settings.DEV: self.chain.register_commands(commands=self.get_commands()) - # 消息处理线程 - self._thread = Thread(target=self.__run) - # 启动事件处理线程 - self._thread.start() # 重启msg SystemChain().restart_finish() - def __run(self): - """ - 事件处理线程 - """ - while not self._event.is_set(): - event, handlers = self.eventmanager.get_event() - if event: - logger.info(f"处理事件:{event.event_type} - {handlers}") - if not handlers and event.event_callback: - event.event_callback() - for handler in handlers: - names = handler.__qualname__.split(".") - [class_name, method_name] = names - try: - if class_name in self.pluginmanager.get_plugin_ids(): - # 插件事件 - result = self.threader.submit( - self.pluginmanager.run_plugin_method, - class_name, method_name, copy.deepcopy(event) - ) - if event.event_callback: - event.event_callback(result) - else: - # 检查全局变量中是否存在 - if class_name not in globals(): - # 导入模块,除了插件和Command本身,只有chain能响应事件 - try: - module = importlib.import_module( - f"app.chain.{class_name[:-5].lower()}" - ) - class_obj = getattr(module, class_name)() - except Exception as e: - logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") - continue - else: - # 通过类名创建类实例 - class_obj = globals()[class_name]() - # 检查类是否存在并调用方法 - if hasattr(class_obj, method_name): - self.threader.submit( - getattr(class_obj, method_name), - copy.deepcopy(event) - ) - except Exception as e: - logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") - self.messagehelper.put(title=f"{event.event_type} 事件处理出错", - message=f"{class_name}.{method_name}:{str(e)}", - role="system") - self.eventmanager.send_event( - EventType.SystemError, - { - "type": "event", - "event_type": event.event_type, - "event_handle": f"{class_name}.{method_name}", - "error": str(e), - "traceback": traceback.format_exc() - } - ) - def __run_command(self, command: Dict[str, any], data_str: str = "", channel: MessageChannel = None, source: str = None, userid: Union[str, int] = None): """ @@ -292,18 +216,6 @@ class Command(metaclass=Singleton): # 没有参数 command['func']() - def stop(self): - """ - 停止事件处理线程 - """ - logger.info("正在停止事件处理...") - self._event.set() - try: - self._thread.join() - logger.info("事件处理停止完成") - except Exception as e: - logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}") - def get_commands(self): """ 获取命令列表 @@ -361,7 +273,7 @@ class Command(metaclass=Singleton): """ 发送插件命令 """ - EventManager().send_event(etype, data) + eventmanager.send_event(etype, data) @eventmanager.register(EventType.CommandExcute) def command_event(self, event: ManagerEvent) -> None: diff --git a/app/core/event.py b/app/core/event.py index 03c3914b..e0c8cc97 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,10 +1,14 @@ +import copy +import importlib import inspect import threading import time +import traceback import uuid from queue import PriorityQueue, Empty from typing import Callable, Dict, List, Union, Optional +from app.helper.message import MessageHelper from app.helper.thread import ThreadHelper from app.log import logger from app.schemas.types import EventType, ChainEventType @@ -12,8 +16,7 @@ from app.utils.singleton import Singleton DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数 -MAX_EVENT_WORKER_POOL_SIZE = 50 # 最大事件工作线程池大小 -EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的超时时间(秒) +EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒) class Event: @@ -55,11 +58,13 @@ class EventManager(metaclass=Singleton): EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件 """ - def __init__(self, max_workers: int = MAX_EVENT_WORKER_POOL_SIZE): - """ - :param max_workers: 线程池最大工作线程数 - """ - self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 + # 退出事件 + _event = threading.Event() + + def __init__(self): + self.messagehelper = MessageHelper() + self.__executor = ThreadHelper() # 动态线程池,用于消费事件 + self.__consumer_threads = [] # 用于保存启动的事件消费者线程 self.__event_queue = PriorityQueue() # 优先级队列 self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者 self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者 @@ -68,9 +73,30 @@ class EventManager(metaclass=Singleton): self.__lock = threading.Lock() # 线程锁 self.__condition = threading.Condition(self.__lock) # 条件变量 + def start(self): + """ + 开始广播事件处理线程 + """ # 启动消费者线程用于处理广播事件 + self._event.set() for _ in range(MIN_EVENT_CONSUMER_THREADS): - threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() + thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True) + thread.start() + self.__consumer_threads.append(thread) # 将线程对象保存到列表中 + + def stop(self): + """ + 停止广播事件处理线程 + """ + logger.info("正在停止事件处理...") + self._event.clear() # 停止广播事件处理 + try: + # 通过遍历保存的线程来等待它们完成 + for consumer_thread in self.__consumer_threads: + consumer_thread.join() + logger.info("事件处理停止完成") + except Exception as e: + logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}") def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: @@ -250,14 +276,15 @@ class EventManager(metaclass=Singleton): :param event: 要调度的事件对象 """ handlers = self.__chain_subscribers.get(event.event_type, {}) + if not handlers: + return self.__log_event_lifecycle(event, "started") for handler_id, (priority, handler) in handlers.items(): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) " - f"completed in {time.time() - start_time:.3f}s") - + f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s" + ) self.__log_event_lifecycle(event, "completed") def __dispatch_broadcast_event(self, event: Event): @@ -266,41 +293,91 @@ class EventManager(metaclass=Singleton): :param event: 要调度的事件对象 """ handlers = self.__broadcast_subscribers.get(event.event_type, {}) + if not handlers: + return for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) def __safe_invoke_handler(self, handler: Callable, event: Event): """ - 安全调用事件处理器,捕获异常并记录日志 - :param handler: 要调用的处理器 + 调用处理器,处理链式或广播事件 + :param handler: 处理器 :param event: 事件对象 """ if not self.__is_handler_enabled(handler): - logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.") + logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution") return + + # 根据事件类型判断是否需要深复制 + is_broadcast_event = isinstance(event.event_type, EventType) + event_to_process = copy.deepcopy(event) if is_broadcast_event else event + + names = handler.__qualname__.split(".") + class_name, method_name = names[0], names[1] + try: - handler(event) + from app.core.plugin import PluginManager + + if class_name in PluginManager().get_plugin_ids(): + # 定义一个插件调用函数 + def plugin_callable(): + PluginManager().run_plugin_method(class_name, method_name, event_to_process) + + if is_broadcast_event: + self.__executor.submit(plugin_callable) + else: + plugin_callable() + else: + # 获取全局对象或模块类的实例 + class_obj = self.__get_class_instance(class_name) + if class_obj and hasattr(class_obj, method_name): + method = getattr(class_obj, method_name) + if is_broadcast_event: + self.__executor.submit(method, event_to_process) + else: + method(event_to_process) except Exception as e: self.__handle_event_error(event, handler, e) + @staticmethod + def __get_class_instance(class_name: str): + """ + 根据类名获取类实例,首先检查全局变量中是否存在该类,如果不存在则尝试动态导入模块。 + :param class_name: 类的名称 + :return: 类的实例 + """ + # 检查类是否在全局变量中 + if class_name in globals(): + class_obj = globals()[class_name]() + else: + # 如果类不在全局变量中,尝试动态导入模块并创建实例 + # 导入模块,除了插件和Command,只有chain能响应事件 + try: + module = importlib.import_module(f"app.chain.{class_name[:-5].lower()}") + class_obj = getattr(module, class_name)() + except Exception as e: + logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + return None + + return class_obj + def __fixed_broadcast_consumer(self): """ 固定的后台广播消费者线程,持续从队列中提取事件 """ - while True: + while not self._event.is_set(): # 使用 Condition 优化队列的等待机制,避免频繁触发超时 with self.__condition: # 当队列为空时,线程进入等待状态,直到有新事件到来 while self.__event_queue.empty(): # 阻塞等待,直到有事件插入 self.__condition.wait() - try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) logger.debug(f"Fixed consumer processing event: {event}") self.__dispatch_broadcast_event(event) except Empty: - logger.debug("Queue is empty, waiting for new events.") + logger.debug("Queue is empty, waiting for new events") @staticmethod def __log_event_lifecycle(event: Event, stage: str): @@ -309,15 +386,28 @@ class EventManager(metaclass=Singleton): """ logger.debug(f"{stage} - {event}") - @staticmethod - def __handle_event_error(event: Event, handler: Callable, error: Exception): + def __handle_event_error(self, event: Event, handler: Callable, e: Exception): """ 全局错误处理器,用于处理事件处理中的异常 """ - logger.error( - f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}") - # 可以将错误事件重新发送到事件队列或执行其他逻辑 - # eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id}) + logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + + names = handler.__qualname__.split(".") + class_name, method_name = names[0], names[1] + + self.messagehelper.put(title=f"{event.event_type} 事件处理出错", + message=f"{class_name}.{method_name}:{str(e)}", + role="system") + self.send_event( + EventType.SystemError, + { + "type": "event", + "event_type": event.event_type, + "event_handle": f"{class_name}.{method_name}", + "error": str(e), + "traceback": traceback.format_exc() + } + ) def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]): """ diff --git a/app/main.py b/app/main.py index 94d5bdf8..668dbea6 100644 --- a/app/main.py +++ b/app/main.py @@ -30,6 +30,7 @@ except ImportError as e: print(error_message, file=sys.stderr) sys.exit(1) +from app.core.event import EventManager from app.core.plugin import PluginManager from app.db.init import init_db, update_db from app.helper.thread import ThreadHelper @@ -212,7 +213,7 @@ def shutdown_server(): PluginManager().stop() PluginManager().stop_monitor() # 停止事件消费 - Command().stop() + EventManager().stop() # 停止虚拟显示 DisplayHelper().stop() # 停止定时服务 @@ -245,8 +246,10 @@ def start_module(): Monitor() # 启动定时服务 Scheduler() - # 启动事件消费 + # 加载命令 Command() + # 启动事件消费 + EventManager().start() # 初始化路由 init_routers() # 启动前端服务