From 49b6052ab0a92c2ffbdc73c1a394be4d9f4ab973 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Thu, 19 Sep 2024 23:55:24 +0800 Subject: [PATCH 1/9] refactor(event): optimize broadcast and chain event --- app/core/event.py | 321 ++++++++++++++++++++++++++++--------------- app/schemas/types.py | 10 +- 2 files changed, 220 insertions(+), 111 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index f3e79015..31343e9c 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,150 +1,251 @@ import threading -import typing -from queue import Queue, Empty -from typing import Dict, Any +import uuid +from enum import Enum +from queue import PriorityQueue, Empty +from typing import Callable, Dict, List, Union, Optional +from app.helper.thread import ThreadHelper from app.log import logger +from app.schemas.types import EventType, SyncEventType from app.utils.singleton import Singleton -from app.schemas.types import EventType + + +class Event: + """ + 事件类,封装事件的基本信息 + """ + + def __init__(self, event_type: Union[EventType, SyncEventType], event_data: Optional[Dict] = None, + priority: int = 10): + """ + :param event_type: 事件的类型,支持 EventType 或 SyncEventType + :param event_data: 可选,事件携带的数据,默认为空字典 + :param priority: 可选,广播事件的优先级,默认为 10 + """ + self.event_id = str(uuid.uuid4()) # 事件ID + self.event_type = event_type # 事件类型 + self.event_data = event_data or {} # 事件数据 + self.priority = priority # 事件优先级 + + def __repr__(self) -> str: + """ + 重写 __repr__ 方法,用于返回事件的详细信息,包括事件类型、事件ID和优先级 + """ + event_kind = Event.get_event_kind(self.event_type) + return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>" + + @staticmethod + def get_event_kind(event_type: Union[EventType, SyncEventType]) -> str: + """ + 根据事件类型判断事件是广播事件还是链式事件 + :param event_type: 事件类型,支持 EventType 或 SyncEventType + :return: 返回 Broadcast Event 或 Chain Event + """ + return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event" class EventManager(metaclass=Singleton): """ - 事件管理器 + EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件 """ - def __init__(self): - # 事件队列 - self._eventQueue = Queue() - # 事件响应函数字典 - self._handlers: Dict[str, Dict[str, Any]] = {} - # 已禁用的事件响应 - self._disabled_handlers = [] + def __init__(self, max_workers: int = 50): + """ + :param max_workers: 线程池最大工作线程数,默认 50 + """ + self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 + self.__event_executor = ThreadHelper(max_workers=3) # 动态线程池,用于处理事件 + self.__event_queue = PriorityQueue() # 优先级队列 + self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表 + self.__disabled_handlers = set() # 禁用的事件处理器集合 + self.__lock = threading.Lock() # 线程锁 + self.__dynamic_consuming = False # 标记是否已经在使用动态线程池 - def get_event(self): - """ - 获取事件 - """ - try: - event = self._eventQueue.get(block=True, timeout=1) - handlers = self._handlers.get(event.event_type) or {} - if handlers: - # 去除掉被禁用的事件响应 - handlerList = [handler for handler in handlers.values() - if handler.__qualname__.split(".")[0] not in self._disabled_handlers] - return event, handlerList - return event, [] - except Empty: - return None, [] + # 启动消费者线程用于处理异步事件 + threading.Thread(target=self.__fixed_consumer, daemon=True).start() - def check(self, etype: EventType): + def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, priority: int = 10) -> \ + Optional[Dict]: """ - 检查事件是否存在响应,去除掉被禁用的事件响应 + 发送事件,根据事件类型决定是广播事件还是链式事件 + :param etype: 事件类型 (EventType 或 SyncEventType) + :param data: 可选,事件数据 + :param priority: 广播事件的优先级,默认为 10 + :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ - if etype.value not in self._handlers: - return False - handlers = self._handlers.get(etype.value) - return any([handler for handler in handlers.values() - if handler.__qualname__.split(".")[0] not in self._disabled_handlers]) - - def add_event_listener(self, etype: EventType, handler: type): - """ - 注册事件处理 - """ - try: - handlers = self._handlers[etype.value] - except KeyError: - handlers = {} - self._handlers[etype.value] = handlers - if handler.__qualname__ in handlers: - handlers.pop(handler.__qualname__) + event = Event(etype, data, priority if isinstance(etype, EventType) else None) + if isinstance(etype, EventType): + self.__trigger_event_async(event, priority) + elif isinstance(etype, SyncEventType): + return self.__trigger_event(event) else: - logger.debug(f"Event Registed:{etype.value} - {handler.__qualname__}") - handlers[handler.__qualname__] = handler + logger.error(f"Unknown event type: {etype}") - def disable_events_hander(self, class_name: str): + def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]) -> None: """ - 标记对应类事件处理为不可用 + 注册事件处理器,将处理器添加到对应的事件订阅列表中 + :param event_type: 事件类型 (EventType 或 SyncEventType) + :param handler: 处理器 """ - if class_name not in self._disabled_handlers: - self._disabled_handlers.append(class_name) - logger.debug(f"Event Disabled:{class_name}") + with self.__lock: + if event_type not in self.__subscribers: + self.__subscribers[event_type] = [] + self.__subscribers[event_type].append(handler) + event_kind = Event.get_event_kind(event_type) + logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") - def enable_events_hander(self, class_name: str): + def remove_event_listener(self, event_type: Union[EventType, SyncEventType], + handler: Callable[[Dict], None]) -> None: """ - 标记对应类事件处理为可用 + 移除事件处理器,将处理器从对应事件的订阅列表中删除 + :param event_type: 事件类型 (EventType 或 SyncEventType) + :param handler: 要移除的处理器 """ - if class_name in self._disabled_handlers: - self._disabled_handlers.remove(class_name) - logger.debug(f"Event Enabled:{class_name}") + with self.__lock: + if event_type in self.__subscribers: + self.__subscribers[event_type].remove(handler) + event_kind = Event.get_event_kind(event_type) + logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") - def send_event(self, etype: EventType, data: dict = None, callback: typing.Callable = None): + def disable_event_handler(self, handler_name: str) -> None: """ - 发送事件(异步响应) + 禁用指定名称的事件处理器,防止其响应事件 + :param handler_name: 要禁用的事件处理器名称 """ - if etype not in EventType: - return - event = Event(etype.value) - event.event_data = data or {} - event.event_callback = callback - logger.debug(f"发送事件:{etype.value} - data:{event.event_data},callback:{callback}") - self._eventQueue.put(event) + self.__disabled_handlers.add(handler_name) + logger.debug(f"Disabled event handler: {handler_name}") - def send_event_sync(self, etype: EventType, data: dict = None): + def enable_event_handler(self, handler_name: str) -> None: """ - 发送事件(同步响应) + 启用指定名称的事件处理器,使其可以继续响应事件 + :param handler_name: 要启用的事件处理器名称 """ - result: any = None - condition = threading.Condition() + self.__disabled_handlers.discard(handler_name) + logger.debug(f"Enabled event handler: {handler_name}") - def callback(res): - nonlocal result - if res: - result = res.result() - with condition: - condition.notify_all() - - thread = threading.Thread(target=self.send_event, args=(etype, data, callback)) - thread.start() - - with condition: - condition.wait() - - return result - - def register(self, etype: [EventType, list]): + def check(self, etype: Union[EventType, SyncEventType]) -> bool: """ - 事件注册 - :param etype: 事件类型 + 检查是否有启用的事件处理器可以响应某个事件类型 + :param etype: 事件类型 (EventType 或 SyncEventType) + :return: 返回是否存在可用的处理器 + """ + if etype not in self.__subscribers: + return False + handlers = self.__subscribers.get(etype, []) + return any(handler.__name__ not in self.__disabled_handlers for handler in handlers) + + def __trigger_event(self, event: Event) -> Dict: + """ + 触发链式事件,按顺序调用订阅的处理器 + :param event: 要处理的事件对象 + :return: 返回处理后的事件数据 + """ + logger.debug(f"Triggering synchronous chain event: {event}") + self.__dispatch_event(event) + return event.event_data + + def __trigger_event_async(self, event: Event, priority: int) -> None: + """ + 触发广播事件,将事件插入到优先级队列中 + :param event: 要处理的事件对象 + :param priority: 事件的优先级 + """ + logger.debug(f"Triggering asynchronous broadcast event: {event}") + self.__event_queue.put((priority, event)) + + # 当固定消费者无法及时处理时,动态启动线程池 + if self.__event_queue.qsize() > 10 and not self.__dynamic_consuming: + self.__dynamic_consuming = True + self.__event_executor.submit(self.__dynamic_consumer) + + def __dispatch_event(self, event: Event) -> None: + """ + 同步方式调度事件,逐个调用事件处理器 + :param event: 要调度的事件对象 + """ + handlers = self.__subscribers.get(event.event_type, []) + for handler in handlers: + if handler.__name__ not in self.__disabled_handlers: + handler(event.event_data) + + def __dispatch_event_async(self, event: Event) -> None: + """ + 异步方式调度事件,通过线程池逐个调用事件处理器 + :param event: 要调度的事件对象 + """ + handlers = self.__subscribers.get(event.event_type, []) + for handler in handlers: + if handler.__name__ not in self.__disabled_handlers: + self.__executor.submit(handler, event.event_data) + + def __fixed_consumer(self) -> None: + """ + 固定的后台消费者线程,持续从队列中提取事件处理 + 该线程始终保持运行状态,确保即使事件量少时也有线程在消费 + """ + while True: + try: + # 阻塞方式从队列获取事件 + priority, event = self.__event_queue.get(block=True, timeout=1) + logger.debug(f"Fixed consumer processing event: {event}") + self.__dispatch_event_async(event) # 调用事件处理器 + except Empty: + continue # 如果队列为空,继续等待 + + def __dynamic_consumer(self) -> None: + """ + 动态消费者线程,通过线程池调度,用于在事件量大时进行扩展 + 一旦队列为空,则结束动态消费,并重置动态消费标志 + """ + while True: + try: + # 非阻塞方式从队列获取事件 + priority, event = self.__event_queue.get(block=False) + logger.debug(f"Dynamic consumer processing event: {event}") + self.__dispatch_event_async(event) # 调用事件处理器 + except Empty: + self.__dynamic_consuming = False # 队列为空,结束动态消费 + break + + def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]): + """ + 事件注册装饰器,用于将函数注册为事件的处理器 + :param etype: + - 单个事件类型成员 (如 EventType.MetadataScrape, SyncEventType.PluginAction) + - 事件类型类 (EventType, SyncEventType) + - 或事件类型成员的列表 """ - def decorator(f): + def decorator(f: Callable[[Dict], None]): + event_list = [] + + # 如果传入的是列表,处理每个事件类型 if isinstance(etype, list): for et in etype: - self.add_event_listener(et, f) - elif type(etype) == type(EventType): - for et in etype.__members__.values(): - self.add_event_listener(et, f) + if isinstance(et, (EventType, SyncEventType)): + event_list.append(et) + else: + raise ValueError(f"列表中无效的事件类型: {et}") + + # 如果传入的是 EventType 或 SyncEventType 类,提取该类中的所有成员 + elif isinstance(etype, type) and issubclass(etype, Enum): + event_list.extend(etype.__members__.values()) + + # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 SyncEventType.PluginAction) + elif isinstance(etype, (EventType, SyncEventType)): + event_list.append(etype) + else: - self.add_event_listener(etype, f) + raise ValueError(f"无效的事件类型: {etype}") + + # 统一注册事件 + for event in event_list: + self.add_event_listener(event, f) + return f return decorator -class Event(object): - """ - 事件对象 - """ - - def __init__(self, event_type=None): - # 事件类型 - self.event_type = event_type - # 字典用于保存具体的事件数据 - self.event_data = {} - # 事件完成后回调函数 - self.event_callback = None - - -# 实例引用,用于注册事件 +# 全局实例定义 eventmanager = EventManager() diff --git a/app/schemas/types.py b/app/schemas/types.py index 41b24f15..87dd8815 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -14,7 +14,7 @@ class TorrentStatus(Enum): DOWNLOADING = "下载中" -# 可监听事件 +# 异步广播事件 class EventType(Enum): # 插件需要重载 PluginReload = "plugin.reload" @@ -56,6 +56,14 @@ class EventType(Enum): MetadataScrape = "metadata.scrape" +# 同步链式事件 +class SyncEventType(Enum): + # 刮削元数据 + MetadataScrape = "metadata.scrape" + # 插件动作 + PluginAction = "plugin.action" + + # 系统配置Key字典 class SystemConfigKey(Enum): # 下载器配置 From e786120e98f448fd6d6e6f64dabdb89d26805da1 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 00:25:38 +0800 Subject: [PATCH 2/9] feat(event): update constant and support Condition for thread --- app/core/event.py | 85 +++++++++++++++++++++-------------------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index 31343e9c..9f20069e 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -9,6 +9,11 @@ from app.log import logger from app.schemas.types import EventType, SyncEventType from app.utils.singleton import Singleton +DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 +MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数 +MAX_EVENT_WORKER_POOL_SIZE = 50 # 最大事件工作线程池大小 +EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的超时时间(秒) + class Event: """ @@ -49,23 +54,23 @@ class EventManager(metaclass=Singleton): EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件 """ - def __init__(self, max_workers: int = 50): + def __init__(self, max_workers: int = MAX_EVENT_WORKER_POOL_SIZE): """ - :param max_workers: 线程池最大工作线程数,默认 50 + :param max_workers: 线程池最大工作线程数 """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 - self.__event_executor = ThreadHelper(max_workers=3) # 动态线程池,用于处理事件 self.__event_queue = PriorityQueue() # 优先级队列 self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表 self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__lock = threading.Lock() # 线程锁 - self.__dynamic_consuming = False # 标记是否已经在使用动态线程池 + self.__condition = threading.Condition(self.__lock) # 条件变量 # 启动消费者线程用于处理异步事件 - threading.Thread(target=self.__fixed_consumer, daemon=True).start() + for _ in range(MIN_EVENT_CONSUMER_THREADS): + threading.Thread(target=self.__fixed_consumer, daemon=True).start() - def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, priority: int = 10) -> \ - Optional[Dict]: + def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, + priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: """ 发送事件,根据事件类型决定是广播事件还是链式事件 :param etype: 事件类型 (EventType 或 SyncEventType) @@ -73,15 +78,17 @@ class EventManager(metaclass=Singleton): :param priority: 广播事件的优先级,默认为 10 :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ - event = Event(etype, data, priority if isinstance(etype, EventType) else None) + event = Event(etype, data, priority) if isinstance(etype, EventType): - self.__trigger_event_async(event, priority) + self.__trigger_event_async(event) + with self.__condition: + self.__condition.notify() elif isinstance(etype, SyncEventType): return self.__trigger_event(event) else: logger.error(f"Unknown event type: {etype}") - def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]) -> None: + def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]): """ 注册事件处理器,将处理器添加到对应的事件订阅列表中 :param event_type: 事件类型 (EventType 或 SyncEventType) @@ -95,7 +102,7 @@ class EventManager(metaclass=Singleton): logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") def remove_event_listener(self, event_type: Union[EventType, SyncEventType], - handler: Callable[[Dict], None]) -> None: + handler: Callable[[Dict], None]): """ 移除事件处理器,将处理器从对应事件的订阅列表中删除 :param event_type: 事件类型 (EventType 或 SyncEventType) @@ -107,7 +114,7 @@ class EventManager(metaclass=Singleton): event_kind = Event.get_event_kind(event_type) logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") - def disable_event_handler(self, handler_name: str) -> None: + def disable_event_handler(self, handler_name: str): """ 禁用指定名称的事件处理器,防止其响应事件 :param handler_name: 要禁用的事件处理器名称 @@ -115,7 +122,7 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers.add(handler_name) logger.debug(f"Disabled event handler: {handler_name}") - def enable_event_handler(self, handler_name: str) -> None: + def enable_event_handler(self, handler_name: str): """ 启用指定名称的事件处理器,使其可以继续响应事件 :param handler_name: 要启用的事件处理器名称 @@ -144,21 +151,15 @@ class EventManager(metaclass=Singleton): self.__dispatch_event(event) return event.event_data - def __trigger_event_async(self, event: Event, priority: int) -> None: + def __trigger_event_async(self, event: Event): """ 触发广播事件,将事件插入到优先级队列中 :param event: 要处理的事件对象 - :param priority: 事件的优先级 """ logger.debug(f"Triggering asynchronous broadcast event: {event}") - self.__event_queue.put((priority, event)) + self.__event_queue.put((event.priority, event)) - # 当固定消费者无法及时处理时,动态启动线程池 - if self.__event_queue.qsize() > 10 and not self.__dynamic_consuming: - self.__dynamic_consuming = True - self.__event_executor.submit(self.__dynamic_consumer) - - def __dispatch_event(self, event: Event) -> None: + def __dispatch_event(self, event: Event): """ 同步方式调度事件,逐个调用事件处理器 :param event: 要调度的事件对象 @@ -168,7 +169,7 @@ class EventManager(metaclass=Singleton): if handler.__name__ not in self.__disabled_handlers: handler(event.event_data) - def __dispatch_event_async(self, event: Event) -> None: + def __dispatch_event_async(self, event: Event): """ 异步方式调度事件,通过线程池逐个调用事件处理器 :param event: 要调度的事件对象 @@ -178,34 +179,24 @@ class EventManager(metaclass=Singleton): if handler.__name__ not in self.__disabled_handlers: self.__executor.submit(handler, event.event_data) - def __fixed_consumer(self) -> None: + def __fixed_consumer(self): """ - 固定的后台消费者线程,持续从队列中提取事件处理 - 该线程始终保持运行状态,确保即使事件量少时也有线程在消费 + 固定的后台消费者线程,持续从队列中提取事件 """ while True: - try: - # 阻塞方式从队列获取事件 - priority, event = self.__event_queue.get(block=True, timeout=1) - logger.debug(f"Fixed consumer processing event: {event}") - self.__dispatch_event_async(event) # 调用事件处理器 - except Empty: - continue # 如果队列为空,继续等待 + # 使用 Condition 优化队列的等待机制,避免频繁触发超时 + with self.__condition: + # 当队列为空时,线程进入等待状态,直到有新事件到来 + while self.__event_queue.empty(): + # 阻塞等待,直到有事件插入 + self.__condition.wait() - def __dynamic_consumer(self) -> None: - """ - 动态消费者线程,通过线程池调度,用于在事件量大时进行扩展 - 一旦队列为空,则结束动态消费,并重置动态消费标志 - """ - while True: - try: - # 非阻塞方式从队列获取事件 - priority, event = self.__event_queue.get(block=False) - logger.debug(f"Dynamic consumer processing event: {event}") - self.__dispatch_event_async(event) # 调用事件处理器 - except Empty: - self.__dynamic_consuming = False # 队列为空,结束动态消费 - break + try: + priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) + logger.debug(f"Fixed consumer processing event: {event}") + self.__dispatch_event_async(event) + except Empty: + logger.debug("Queue is empty, waiting for new events.") def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]): """ From 85cb9f7cd7f0f869c5c789c65753602e66d28548 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 01:34:01 +0800 Subject: [PATCH 3/9] feat(event): add visualization and enhance handler --- app/core/event.py | 81 ++++++++++++++++++++++++++++++++++++++-------- app/core/plugin.py | 12 +++---- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index 9f20069e..d58319fb 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -62,6 +62,7 @@ class EventManager(metaclass=Singleton): self.__event_queue = PriorityQueue() # 优先级队列 self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表 self.__disabled_handlers = set() # 禁用的事件处理器集合 + self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 self.__condition = threading.Condition(self.__lock) # 条件变量 @@ -114,21 +115,31 @@ class EventManager(metaclass=Singleton): event_kind = Event.get_event_kind(event_type) logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") - def disable_event_handler(self, handler_name: str): + def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): """ - 禁用指定名称的事件处理器,防止其响应事件 + 禁用指定名称的事件处理器或事件处理类,防止其响应事件 :param handler_name: 要禁用的事件处理器名称 + :param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器 """ - self.__disabled_handlers.add(handler_name) - logger.debug(f"Disabled event handler: {handler_name}") + if class_name: + self.__disabled_classes.add(class_name) + logger.debug(f"Disabled event handler class: {class_name}") + else: + self.__disabled_handlers.add(handler_name) + logger.debug(f"Disabled event handler: {handler_name}") - def enable_event_handler(self, handler_name: str): + def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None): """ - 启用指定名称的事件处理器,使其可以继续响应事件 + 启用指定名称的事件处理器或事件处理类,使其可以继续响应事件 :param handler_name: 要启用的事件处理器名称 + :param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器 """ - self.__disabled_handlers.discard(handler_name) - logger.debug(f"Enabled event handler: {handler_name}") + if class_name: + self.__disabled_classes.discard(class_name) + logger.debug(f"Enabled event handler class: {class_name}") + else: + self.__disabled_handlers.discard(handler_name) + logger.debug(f"Enabled event handler: {handler_name}") def check(self, etype: Union[EventType, SyncEventType]) -> bool: """ @@ -139,7 +150,34 @@ class EventManager(metaclass=Singleton): if etype not in self.__subscribers: return False handlers = self.__subscribers.get(etype, []) - return any(handler.__name__ not in self.__disabled_handlers for handler in handlers) + return any( + handler.__name__ not in self.__disabled_handlers and + handler.__qualname__.split(".")[0] not in self.__disabled_classes + for handler in handlers + ) + + def visualize_handlers(self) -> List[Dict[str, str]]: + """ + 可视化所有事件处理器,包括是否被禁用的状态 + :return: 处理器列表,包含处理器名称、类名和状态 + """ + handler_info = [] + with (self.__lock): + for event_type, handlers in self.__subscribers.items(): + for handler in handlers: + class_name = handler.__qualname__.split(".")[0] + status = ( + "disabled" + if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes + else "enabled" + ) + handler_info.append({ + "event_type": event_type.value, + "handler_name": handler.__name__, + "class_name": class_name, + "status": status + }) + return handler_info def __trigger_event(self, event: Event) -> Dict: """ @@ -166,8 +204,12 @@ class EventManager(metaclass=Singleton): """ handlers = self.__subscribers.get(event.event_type, []) for handler in handlers: - if handler.__name__ not in self.__disabled_handlers: - handler(event.event_data) + class_name = handler.__qualname__.split(".")[0] + if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: + try: + handler(event.event_data) + except Exception as e: + logger.error(f"Error handling event {event.event_type}: {str(e)}", exc_info=True) def __dispatch_event_async(self, event: Event): """ @@ -176,8 +218,21 @@ class EventManager(metaclass=Singleton): """ handlers = self.__subscribers.get(event.event_type, []) for handler in handlers: - if handler.__name__ not in self.__disabled_handlers: - self.__executor.submit(handler, event.event_data) + class_name = handler.__qualname__.split(".")[0] + if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: + self.__executor.submit(self.__safe_invoke_handler, handler, event) + + @staticmethod + def __safe_invoke_handler(handler: Callable[[Dict], None], event: Event): + """ + 安全调用事件处理器,捕获异常并记录日志 + :param handler: 要调用的处理器 + :param event: 事件对象 + """ + try: + handler(event.event_data) + except Exception as e: + logger.error(f"Error in asynchronous handler {handler.__name__}: {str(e)}", exc_info=True) def __fixed_consumer(self): """ diff --git a/app/core/plugin.py b/app/core/plugin.py index 8def04a5..315510fa 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -7,6 +7,7 @@ import traceback from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type +from app.helper.sites import SitesHelper from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -17,7 +18,6 @@ from app.db.plugindata_oper import PluginDataOper from app.db.systemconfig_oper import SystemConfigOper from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper -from app.helper.sites import SitesHelper from app.log import logger from app.schemas.types import SystemConfigKey from app.utils.crypto import RSAUtils @@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton): # 未安装的不加载 if plugin_id not in installed_plugins: # 设置事件状态为不可用 - eventmanager.disable_events_hander(plugin_id) + eventmanager.disable_event_handler(class_name=plugin_id) continue # 生成实例 plugin_obj = plugin() @@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton): logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}") # 启用的插件才设置事件注册状态可用 if plugin_obj.get_state(): - eventmanager.enable_events_hander(plugin_id) + eventmanager.enable_event_handler(class_name=plugin_id) else: - eventmanager.disable_events_hander(plugin_id) + eventmanager.disable_event_handler(class_name=plugin_id) except Exception as err: logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}") @@ -184,10 +184,10 @@ class PluginManager(metaclass=Singleton): self._running_plugins[plugin_id].init_plugin(conf) if self._running_plugins[plugin_id].get_state(): # 设置启用的插件事件注册状态可用 - eventmanager.enable_events_hander(plugin_id) + eventmanager.enable_event_handler(class_name=plugin_id) else: # 设置事件状态为不可用 - eventmanager.disable_events_hander(plugin_id) + eventmanager.disable_event_handler(class_name=plugin_id) def stop(self, pid: str = None): """ From 3bee5a8a866786a45392c906bbd4443b96d4fc10 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 13:52:09 +0800 Subject: [PATCH 4/9] feat(event): separate implementation of broadcast and chain --- app/core/event.py | 196 +++++++++++++++++++++++++++---------------- app/schemas/types.py | 7 +- 2 files changed, 127 insertions(+), 76 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index d58319fb..a2f93e8f 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,12 +1,12 @@ import threading +import time import uuid -from enum import Enum from queue import PriorityQueue, Empty from typing import Callable, Dict, List, Union, Optional from app.helper.thread import ThreadHelper from app.log import logger -from app.schemas.types import EventType, SyncEventType +from app.schemas.types import EventType, ChainEventType from app.utils.singleton import Singleton DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 @@ -20,12 +20,12 @@ class Event: 事件类,封装事件的基本信息 """ - def __init__(self, event_type: Union[EventType, SyncEventType], event_data: Optional[Dict] = None, - priority: int = 10): + def __init__(self, event_type: Union[EventType, ChainEventType], event_data: Optional[Dict] = None, + priority: int = DEFAULT_EVENT_PRIORITY): """ - :param event_type: 事件的类型,支持 EventType 或 SyncEventType + :param event_type: 事件的类型,支持 EventType 或 ChainEventType :param event_data: 可选,事件携带的数据,默认为空字典 - :param priority: 可选,广播事件的优先级,默认为 10 + :param priority: 可选,事件的优先级,默认为 10 """ self.event_id = str(uuid.uuid4()) # 事件ID self.event_type = event_type # 事件类型 @@ -40,10 +40,10 @@ class Event: return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>" @staticmethod - def get_event_kind(event_type: Union[EventType, SyncEventType]) -> str: + def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str: """ 根据事件类型判断事件是广播事件还是链式事件 - :param event_type: 事件类型,支持 EventType 或 SyncEventType + :param event_type: 事件类型,支持 EventType 或 ChainEventType :return: 返回 Broadcast Event 或 Chain Event """ return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event" @@ -60,60 +60,75 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表 + self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ + ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器) self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 self.__condition = threading.Condition(self.__lock) # 条件变量 - # 启动消费者线程用于处理异步事件 + # 启动消费者线程用于处理广播事件 for _ in range(MIN_EVENT_CONSUMER_THREADS): - threading.Thread(target=self.__fixed_consumer, daemon=True).start() + threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() - def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, + def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: """ 发送事件,根据事件类型决定是广播事件还是链式事件 - :param etype: 事件类型 (EventType 或 SyncEventType) + :param etype: 事件类型 (EventType 或 ChainEventType) :param data: 可选,事件数据 :param priority: 广播事件的优先级,默认为 10 :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ event = Event(etype, data, priority) if isinstance(etype, EventType): - self.__trigger_event_async(event) + self.__trigger_broadcast_event(event) with self.__condition: self.__condition.notify() - elif isinstance(etype, SyncEventType): - return self.__trigger_event(event) + elif isinstance(etype, ChainEventType): + return self.__trigger_chain_event(event) else: logger.error(f"Unknown event type: {etype}") - def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]): + def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None], + priority: int = DEFAULT_EVENT_PRIORITY): """ 注册事件处理器,将处理器添加到对应的事件订阅列表中 - :param event_type: 事件类型 (EventType 或 SyncEventType) + :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 处理器 + :param priority: 可选,事件的优先级,默认为 10 """ with self.__lock: - if event_type not in self.__subscribers: - self.__subscribers[event_type] = [] - self.__subscribers[event_type].append(handler) - event_kind = Event.get_event_kind(event_type) - logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") + if isinstance(event_type, ChainEventType): + # 链式事件,按优先级排序 + if event_type not in self.__chain_subscribers: + self.__chain_subscribers[event_type] = [] + self.__chain_subscribers[event_type].append((priority, handler)) + self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 按优先级排序 + logger.debug( + f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") + else: + # 广播事件 + if event_type not in self.__broadcast_subscribers: + self.__broadcast_subscribers[event_type] = [] + self.__broadcast_subscribers[event_type].append(handler) + logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") - def remove_event_listener(self, event_type: Union[EventType, SyncEventType], - handler: Callable[[Dict], None]): + def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]): """ 移除事件处理器,将处理器从对应事件的订阅列表中删除 - :param event_type: 事件类型 (EventType 或 SyncEventType) + :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 要移除的处理器 """ with self.__lock: - if event_type in self.__subscribers: - self.__subscribers[event_type].remove(handler) - event_kind = Event.get_event_kind(event_type) - logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") + if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: + self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if + h[1] != handler] + logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") + elif event_type in self.__broadcast_subscribers: + self.__broadcast_subscribers[event_type].remove(handler) + logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): """ @@ -141,20 +156,26 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers.discard(handler_name) logger.debug(f"Enabled event handler: {handler_name}") - def check(self, etype: Union[EventType, SyncEventType]) -> bool: + def check(self, etype: Union[EventType, ChainEventType]) -> bool: """ 检查是否有启用的事件处理器可以响应某个事件类型 - :param etype: 事件类型 (EventType 或 SyncEventType) + :param etype: 事件类型 (EventType 或 ChainEventType) :return: 返回是否存在可用的处理器 """ - if etype not in self.__subscribers: - return False - handlers = self.__subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for handler in handlers - ) + if isinstance(etype, ChainEventType): + handlers = self.__chain_subscribers.get(etype, []) + return any( + handler.__name__ not in self.__disabled_handlers and + handler.__qualname__.split(".")[0] not in self.__disabled_classes + for _, handler in handlers + ) + else: + handlers = self.__broadcast_subscribers.get(etype, []) + return any( + handler.__name__ not in self.__disabled_handlers and + handler.__qualname__.split(".")[0] not in self.__disabled_classes + for handler in handlers + ) def visualize_handlers(self) -> List[Dict[str, str]]: """ @@ -162,14 +183,12 @@ class EventManager(metaclass=Singleton): :return: 处理器列表,包含处理器名称、类名和状态 """ handler_info = [] - with (self.__lock): - for event_type, handlers in self.__subscribers.items(): + with self.__lock: + for event_type, handlers in self.__broadcast_subscribers.items(): for handler in handlers: class_name = handler.__qualname__.split(".")[0] status = ( - "disabled" - if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes - else "enabled" + "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" ) handler_info.append({ "event_type": event_type.value, @@ -177,53 +196,71 @@ class EventManager(metaclass=Singleton): "class_name": class_name, "status": status }) + for event_type, handlers in self.__chain_subscribers.items(): + for priority, handler in handlers: + class_name = handler.__qualname__.split(".")[0] + status = ( + "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" + ) + handler_info.append({ + "event_type": event_type.value, + "handler_name": handler.__name__, + "class_name": class_name, + "priority": priority, + "status": status + }) return handler_info - def __trigger_event(self, event: Event) -> Dict: + def __trigger_chain_event(self, event: Event) -> Dict: """ 触发链式事件,按顺序调用订阅的处理器 :param event: 要处理的事件对象 :return: 返回处理后的事件数据 """ logger.debug(f"Triggering synchronous chain event: {event}") - self.__dispatch_event(event) + self.__dispatch_chain_event(event) return event.event_data - def __trigger_event_async(self, event: Event): + def __trigger_broadcast_event(self, event: Event): """ 触发广播事件,将事件插入到优先级队列中 :param event: 要处理的事件对象 """ - logger.debug(f"Triggering asynchronous broadcast event: {event}") + logger.debug(f"Triggering broadcast event: {event}") self.__event_queue.put((event.priority, event)) - def __dispatch_event(self, event: Event): + def __dispatch_chain_event(self, event: Event): """ - 同步方式调度事件,逐个调用事件处理器 + 同步方式调度链式事件,按优先级顺序逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__subscribers.get(event.event_type, []) - for handler in handlers: + handlers = self.__chain_subscribers.get(event.event_type, []) + self.__log_event_lifecycle(event, "started") + for priority, handler in handlers: class_name = handler.__qualname__.split(".")[0] if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: + start_time = time.time() try: handler(event.event_data) + logger.debug( + f"Handler {handler.__qualname__} (Priority: {priority}) " + f"completed in {time.time() - start_time:.3f}s") except Exception as e: - logger.error(f"Error handling event {event.event_type}: {str(e)}", exc_info=True) + self.__handle_event_error(event, handler, e) + self.__log_event_lifecycle(event, "completed") - def __dispatch_event_async(self, event: Event): + def __dispatch_broadcast_event(self, event: Event): """ - 异步方式调度事件,通过线程池逐个调用事件处理器 + 异步方式调度广播事件,通过线程池逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__subscribers.get(event.event_type, []) + handlers = self.__broadcast_subscribers.get(event.event_type, []) for handler in handlers: class_name = handler.__qualname__.split(".")[0] if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: self.__executor.submit(self.__safe_invoke_handler, handler, event) - @staticmethod - def __safe_invoke_handler(handler: Callable[[Dict], None], event: Event): + def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event): """ 安全调用事件处理器,捕获异常并记录日志 :param handler: 要调用的处理器 @@ -232,11 +269,11 @@ class EventManager(metaclass=Singleton): try: handler(event.event_data) except Exception as e: - logger.error(f"Error in asynchronous handler {handler.__name__}: {str(e)}", exc_info=True) + self.__handle_event_error(event, handler, e) - def __fixed_consumer(self): + def __fixed_broadcast_consumer(self): """ - 固定的后台消费者线程,持续从队列中提取事件 + 固定的后台广播消费者线程,持续从队列中提取事件 """ while True: # 使用 Condition 优化队列的等待机制,避免频繁触发超时 @@ -249,16 +286,33 @@ class EventManager(metaclass=Singleton): try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) logger.debug(f"Fixed consumer processing event: {event}") - self.__dispatch_event_async(event) + self.__dispatch_broadcast_event(event) except Empty: logger.debug("Queue is empty, waiting for new events.") - def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]): + @staticmethod + def __log_event_lifecycle(event: Event, stage: str): + """ + 记录事件的生命周期日志 + """ + logger.debug(f"{stage} - {event}") + + @staticmethod + def __handle_event_error(event: Event, handler: Callable, error: Exception): + """ + 全局错误处理器,用于处理事件处理中的异常 + """ + logger.error( + f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}") + # 可以将错误事件重新发送到事件队列或执行其他逻辑 + # eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id}) + + def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]): """ 事件注册装饰器,用于将函数注册为事件的处理器 :param etype: - - 单个事件类型成员 (如 EventType.MetadataScrape, SyncEventType.PluginAction) - - 事件类型类 (EventType, SyncEventType) + - 单个事件类型成员 (如 EventType.MetadataScrape, ChainEventType.PluginAction) + - 事件类型类 (EventType, ChainEventType) - 或事件类型成员的列表 """ @@ -268,17 +322,17 @@ class EventManager(metaclass=Singleton): # 如果传入的是列表,处理每个事件类型 if isinstance(etype, list): for et in etype: - if isinstance(et, (EventType, SyncEventType)): + if isinstance(et, (EventType, ChainEventType)): event_list.append(et) else: raise ValueError(f"列表中无效的事件类型: {et}") - # 如果传入的是 EventType 或 SyncEventType 类,提取该类中的所有成员 - elif isinstance(etype, type) and issubclass(etype, Enum): + # 如果传入的是 EventType 或 ChainEventType 类,提取该类中的所有成员 + elif isinstance(etype, type) and issubclass(etype, (EventType, ChainEventType)): event_list.extend(etype.__members__.values()) - # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 SyncEventType.PluginAction) - elif isinstance(etype, (EventType, SyncEventType)): + # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 ChainEventType.PluginAction) + elif isinstance(etype, (EventType, ChainEventType)): event_list.append(etype) else: diff --git a/app/schemas/types.py b/app/schemas/types.py index 87dd8815..5dc30960 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -57,11 +57,8 @@ class EventType(Enum): # 同步链式事件 -class SyncEventType(Enum): - # 刮削元数据 - MetadataScrape = "metadata.scrape" - # 插件动作 - PluginAction = "plugin.action" +class ChainEventType(Enum): + pass # 系统配置Key字典 From be63e9ed1504c8a8e2b0eca92a44e8af62c71ce4 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:26:45 +0800 Subject: [PATCH 5/9] feat(event): optimize handler --- app/core/event.py | 196 +++++++++++++++++++++++---------------------- app/core/plugin.py | 25 +++--- 2 files changed, 115 insertions(+), 106 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index a2f93e8f..cbdb378a 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,3 +1,4 @@ +import inspect import threading import time import uuid @@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者 - self.__chain_subscribers: Dict[ - ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器) + self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者 self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 @@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton): threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, - priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: + priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: """ 发送事件,根据事件类型决定是广播事件还是链式事件 :param etype: 事件类型 (EventType 或 ChainEventType) @@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton): else: logger.error(f"Unknown event type: {etype}") - def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None], + def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable, priority: int = DEFAULT_EVENT_PRIORITY): """ 注册事件处理器,将处理器添加到对应的事件订阅列表中 @@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton): self.__broadcast_subscribers[event_type].append(handler) logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") - def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]): + def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): """ 移除事件处理器,将处理器从对应事件的订阅列表中删除 :param event_type: 事件类型 (EventType 或 ChainEventType) @@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton): self.__broadcast_subscribers[event_type].remove(handler) logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") - def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): + def disable_event_handler(self, target: Union[Callable, type]): """ - 禁用指定名称的事件处理器或事件处理类,防止其响应事件 - :param handler_name: 要禁用的事件处理器名称 - :param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器 + 禁用指定的事件处理器或事件处理器类 + :param target: 处理器函数或类 """ - if class_name: - self.__disabled_classes.add(class_name) - logger.debug(f"Disabled event handler class: {class_name}") + identifier = self.__get_handler_identifier(target) + if isinstance(target, type): + self.__disabled_classes.add(identifier) + logger.debug(f"Disabled event handler class: {identifier}") else: - self.__disabled_handlers.add(handler_name) - logger.debug(f"Disabled event handler: {handler_name}") + self.__disabled_handlers.add(identifier) + logger.debug(f"Disabled event handler: {identifier}") - def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None): + def enable_event_handler(self, target: Union[Callable, type]): """ - 启用指定名称的事件处理器或事件处理类,使其可以继续响应事件 - :param handler_name: 要启用的事件处理器名称 - :param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器 + 启用指定的事件处理器或事件处理器类 + :param target: 处理器函数或类 """ - if class_name: - self.__disabled_classes.discard(class_name) - logger.debug(f"Enabled event handler class: {class_name}") + identifier = self.__get_handler_identifier(target) + if isinstance(target, type): + self.__disabled_classes.discard(identifier) + logger.debug(f"Enabled event handler class: {identifier}") else: - self.__disabled_handlers.discard(handler_name) - logger.debug(f"Enabled event handler: {handler_name}") + self.__disabled_handlers.discard(identifier) + logger.debug(f"Enabled event handler: {identifier}") - def check(self, etype: Union[EventType, ChainEventType]) -> bool: - """ - 检查是否有启用的事件处理器可以响应某个事件类型 - :param etype: 事件类型 (EventType 或 ChainEventType) - :return: 返回是否存在可用的处理器 - """ - if isinstance(etype, ChainEventType): - handlers = self.__chain_subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for _, handler in handlers - ) - else: - handlers = self.__broadcast_subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for handler in handlers - ) - - def visualize_handlers(self) -> List[Dict[str, str]]: + def visualize_handlers(self) -> List[Dict]: """ 可视化所有事件处理器,包括是否被禁用的状态 - :return: 处理器列表,包含处理器名称、类名和状态 + :return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态 """ handler_info = [] - with self.__lock: - for event_type, handlers in self.__broadcast_subscribers.items(): - for handler in handlers: - class_name = handler.__qualname__.split(".")[0] - status = ( - "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" - ) - handler_info.append({ - "event_type": event_type.value, - "handler_name": handler.__name__, - "class_name": class_name, - "status": status - }) - for event_type, handlers in self.__chain_subscribers.items(): - for priority, handler in handlers: - class_name = handler.__qualname__.split(".")[0] - status = ( - "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" - ) - handler_info.append({ - "event_type": event_type.value, - "handler_name": handler.__name__, - "class_name": class_name, - "priority": priority, - "status": status - }) + # 统一处理广播事件和链式事件 + for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items(): + for handler_data in subscribers: + if isinstance(subscribers, dict): + priority, handler = handler_data + else: + priority = None + handler = handler_data + # 获取处理器的唯一标识符 + handler_id = self.__get_handler_identifier(handler) + # 检查处理器的启用状态 + status = "enabled" if self.__is_handler_enabled(handler) else "disabled" + # 构建处理器信息字典 + handler_dict = { + "event_type": event_type.value, + "handler_identifier": handler_id, + "status": status + } + if priority is not None: + handler_dict["priority"] = priority + handler_info.append(handler_dict) return handler_info - def __trigger_chain_event(self, event: Event) -> Dict: + @staticmethod + def __get_handler_identifier(target: Union[Callable, type]) -> str: """ - 触发链式事件,按顺序调用订阅的处理器 - :param event: 要处理的事件对象 - :return: 返回处理后的事件数据 + 获取处理器或处理器类的唯一标识符,包括模块名和类名 + :param target: 处理器函数或类 + :return: 唯一标识符 + """ + if isinstance(target, type): + # 如果是类,使用模块名和类名 + module_name = target.__module__ + class_name = target.__qualname__ + return f"{module_name}.{class_name}" + else: + # 如果是函数或方法,使用 inspect.getmodule 来获取模块名 + module = inspect.getmodule(target) + module_name = module.__name__ if module else "unknown_module" + qualname = target.__qualname__ + return f"{module_name}.{qualname}" + + def __is_handler_enabled(self, handler: Callable) -> bool: + """ + 检查处理器是否已启用(没有被禁用) + :param handler: 处理器函数 + :return: 如果处理器启用则返回 True,否则返回 False + """ + # 获取处理器的唯一标识符 + handler_id = self.__get_handler_identifier(handler) + + # 获取处理器所属类的唯一标识符 + class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None + + # 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False + if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes): + return False + + return True + + def __trigger_chain_event(self, event: Event) -> Event: + """ + 触发链式事件,按顺序调用订阅的处理器,并记录处理耗时 """ logger.debug(f"Triggering synchronous chain event: {event}") self.__dispatch_chain_event(event) - return event.event_data + return event def __trigger_broadcast_event(self, event: Event): """ @@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton): def __dispatch_chain_event(self, event: Event): """ - 同步方式调度链式事件,按优先级顺序逐个调用事件处理器 + 同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间 :param event: 要调度的事件对象 """ handlers = self.__chain_subscribers.get(event.event_type, []) self.__log_event_lifecycle(event, "started") for priority, handler in handlers: - class_name = handler.__qualname__.split(".")[0] - if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: - start_time = time.time() - try: - handler(event.event_data) - logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) " - f"completed in {time.time() - start_time:.3f}s") - except Exception as e: - self.__handle_event_error(event, handler, e) + start_time = time.time() + self.__safe_invoke_handler(handler, event) + logger.debug( + f"Handler {handler.__qualname__} (Priority: {priority}) " + f"completed in {time.time() - start_time:.3f}s") + self.__log_event_lifecycle(event, "completed") def __dispatch_broadcast_event(self, event: Event): @@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton): """ handlers = self.__broadcast_subscribers.get(event.event_type, []) for handler in handlers: - class_name = handler.__qualname__.split(".")[0] - if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: - self.__executor.submit(self.__safe_invoke_handler, handler, event) + self.__executor.submit(self.__safe_invoke_handler, handler, event) - def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event): + def __safe_invoke_handler(self, handler: Callable, event: Event): """ 安全调用事件处理器,捕获异常并记录日志 :param handler: 要调用的处理器 :param event: 事件对象 """ + if not self.__is_handler_enabled(handler): + logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.") + return try: - handler(event.event_data) + handler(event) except Exception as e: self.__handle_event_error(event, handler, e) @@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton): - 或事件类型成员的列表 """ - def decorator(f: Callable[[Dict], None]): + def decorator(f: Callable): event_list = [] # 如果传入的是列表,处理每个事件类型 diff --git a/app/core/plugin.py b/app/core/plugin.py index 315510fa..ff365006 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -7,7 +7,6 @@ import traceback from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type -from app.helper.sites import SitesHelper from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper from app.db.systemconfig_oper import SystemConfigOper from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper +from app.helper.sites import SitesHelper from app.log import logger from app.schemas.types import SystemConfigKey from app.utils.crypto import RSAUtils @@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton): # 未安装的不加载 if plugin_id not in installed_plugins: # 设置事件状态为不可用 - eventmanager.disable_event_handler(class_name=plugin_id) + eventmanager.disable_event_handler(plugin) continue # 生成实例 plugin_obj = plugin() @@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton): logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}") # 启用的插件才设置事件注册状态可用 if plugin_obj.get_state(): - eventmanager.enable_event_handler(class_name=plugin_id) + eventmanager.enable_event_handler(plugin) else: - eventmanager.disable_event_handler(class_name=plugin_id) + eventmanager.disable_event_handler(plugin) except Exception as err: logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}") @@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton): :param plugin_id: 插件ID :param conf: 插件配置 """ - if not self._running_plugins.get(plugin_id): + plugin = self._running_plugins.get(plugin_id) + if not plugin: return - self._running_plugins[plugin_id].init_plugin(conf) - if self._running_plugins[plugin_id].get_state(): - # 设置启用的插件事件注册状态可用 - eventmanager.enable_event_handler(class_name=plugin_id) + # 初始化插件 + plugin.init_plugin(conf) + # 检查插件状态并启用/禁用事件处理器 + if plugin.get_state(): + # 启用插件类的事件处理器 + eventmanager.enable_event_handler(type(plugin)) else: - # 设置事件状态为不可用 - eventmanager.disable_event_handler(class_name=plugin_id) + # 禁用插件类的事件处理器 + eventmanager.disable_event_handler(type(plugin)) def stop(self, pid: str = None): """ From ef0768ec4484c6f6f00a6fe865605a9cdc4da4cd Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:32:36 +0800 Subject: [PATCH 6/9] feat(event): simplify register decorator --- app/core/event.py | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index cbdb378a..157d14bc 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -323,30 +323,22 @@ class EventManager(metaclass=Singleton): """ def decorator(f: Callable): - event_list = [] - - # 如果传入的是列表,处理每个事件类型 + # 将输入的事件类型统一转换为列表格式 if isinstance(etype, list): - for et in etype: - if isinstance(et, (EventType, ChainEventType)): - event_list.append(et) - else: - raise ValueError(f"列表中无效的事件类型: {et}") - - # 如果传入的是 EventType 或 ChainEventType 类,提取该类中的所有成员 - elif isinstance(etype, type) and issubclass(etype, (EventType, ChainEventType)): - event_list.extend(etype.__members__.values()) - - # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 ChainEventType.PluginAction) - elif isinstance(etype, (EventType, ChainEventType)): - event_list.append(etype) - + event_list = etype # 传入的已经是列表,直接使用 else: - raise ValueError(f"无效的事件类型: {etype}") + event_list = [etype] # 不是列表则包裹成单一元素的列表 - # 统一注册事件 + # 遍历列表,处理每个事件类型 for event in event_list: - self.add_event_listener(event, f) + if isinstance(event, (EventType, ChainEventType)): + self.add_event_listener(event, f) + elif isinstance(event, type) and issubclass(event, (EventType, ChainEventType)): + # 如果是 EventType 或 ChainEventType 类,提取该类中的所有成员 + for et in event.__members__.values(): + self.add_event_listener(et, f) + else: + raise ValueError(f"无效的事件类型: {event}") return f From 688693b31f576aa0a4aaaf2ec0cb8f93986e8e0b Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 18:42:29 +0800 Subject: [PATCH 7/9] feat(event): use dict for subscribers and replace handler if exists --- app/core/event.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index 157d14bc..03c3914b 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -61,8 +61,8 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者 - self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者 + self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者 self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 @@ -97,22 +97,27 @@ class EventManager(metaclass=Singleton): 注册事件处理器,将处理器添加到对应的事件订阅列表中 :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 处理器 - :param priority: 可选,事件的优先级,默认为 10 + :param priority: 可选,链式事件的优先级,默认为 10;广播事件不需要优先级 """ with self.__lock: + handler_identifier = self.__get_handler_identifier(handler) + if isinstance(event_type, ChainEventType): # 链式事件,按优先级排序 if event_type not in self.__chain_subscribers: - self.__chain_subscribers[event_type] = [] - self.__chain_subscribers[event_type].append((priority, handler)) - self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 按优先级排序 + self.__chain_subscribers[event_type] = {} + self.__chain_subscribers[event_type][handler_identifier] = (priority, handler) + # 根据优先级排序 + self.__chain_subscribers[event_type] = dict( + sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0]) + ) logger.debug( f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") else: # 广播事件 if event_type not in self.__broadcast_subscribers: - self.__broadcast_subscribers[event_type] = [] - self.__broadcast_subscribers[event_type].append(handler) + self.__broadcast_subscribers[event_type] = {} + self.__broadcast_subscribers[event_type][handler_identifier] = handler logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): @@ -122,12 +127,13 @@ class EventManager(metaclass=Singleton): :param handler: 要移除的处理器 """ with self.__lock: + handler_identifier = self.__get_handler_identifier(handler) + if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: - self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if - h[1] != handler] + self.__chain_subscribers[event_type].pop(handler_identifier, None) logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") elif event_type in self.__broadcast_subscribers: - self.__broadcast_subscribers[event_type].remove(handler) + self.__broadcast_subscribers[event_type].pop(handler_identifier, None) logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") def disable_event_handler(self, target: Union[Callable, type]): @@ -243,9 +249,9 @@ class EventManager(metaclass=Singleton): 同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间 :param event: 要调度的事件对象 """ - handlers = self.__chain_subscribers.get(event.event_type, []) + handlers = self.__chain_subscribers.get(event.event_type, {}) self.__log_event_lifecycle(event, "started") - for priority, handler in handlers: + for handler_id, (priority, handler) in handlers.items(): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( @@ -259,8 +265,8 @@ class EventManager(metaclass=Singleton): 异步方式调度广播事件,通过线程池逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__broadcast_subscribers.get(event.event_type, []) - for handler in handlers: + handlers = self.__broadcast_subscribers.get(event.event_type, {}) + for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) def __safe_invoke_handler(self, handler: Callable, event: Event): From 857383c8d031993b183c6606064e78e7ccfde614 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 20:37:29 +0800 Subject: [PATCH 8/9] feat(event): improve event consumer logic for handling of events --- app/command.py | 92 +------------------------------ app/core/event.py | 138 ++++++++++++++++++++++++++++++++++++++-------- app/main.py | 7 ++- 3 files changed, 121 insertions(+), 116 deletions(-) diff --git a/app/command.py b/app/command.py index 6426046d..ab7d272a 100644 --- a/app/command.py +++ b/app/command.py @@ -1,8 +1,4 @@ -import copy -import importlib -import threading import traceback -from threading import Thread from typing import Any, Union, Dict from app.chain import ChainBase @@ -12,11 +8,9 @@ from app.chain.subscribe import SubscribeChain from app.chain.system import SystemChain from app.chain.transfer import TransferChain from app.core.config import settings -from app.core.event import Event as ManagerEvent -from app.core.event import eventmanager, EventManager +from app.core.event import Event as ManagerEvent, eventmanager from app.core.plugin import PluginManager from app.helper.message import MessageHelper -from app.helper.thread import ThreadHelper from app.log import logger from app.scheduler import Scheduler from app.schemas import Notification @@ -41,12 +35,7 @@ class Command(metaclass=Singleton): # 内建命令 _commands = {} - # 退出事件 - _event = threading.Event() - def __init__(self): - # 事件管理器 - self.eventmanager = EventManager() # 插件管理器 self.pluginmanager = PluginManager() # 处理链 @@ -55,8 +44,6 @@ class Command(metaclass=Singleton): self.scheduler = Scheduler() # 消息管理器 self.messagehelper = MessageHelper() - # 线程管理器 - self.threader = ThreadHelper() # 内置命令 self._commands = { "/cookiecloud": { @@ -172,72 +159,9 @@ class Command(metaclass=Singleton): # 广播注册命令菜单 if not settings.DEV: self.chain.register_commands(commands=self.get_commands()) - # 消息处理线程 - self._thread = Thread(target=self.__run) - # 启动事件处理线程 - self._thread.start() # 重启msg SystemChain().restart_finish() - def __run(self): - """ - 事件处理线程 - """ - while not self._event.is_set(): - event, handlers = self.eventmanager.get_event() - if event: - logger.info(f"处理事件:{event.event_type} - {handlers}") - if not handlers and event.event_callback: - event.event_callback() - for handler in handlers: - names = handler.__qualname__.split(".") - [class_name, method_name] = names - try: - if class_name in self.pluginmanager.get_plugin_ids(): - # 插件事件 - result = self.threader.submit( - self.pluginmanager.run_plugin_method, - class_name, method_name, copy.deepcopy(event) - ) - if event.event_callback: - event.event_callback(result) - else: - # 检查全局变量中是否存在 - if class_name not in globals(): - # 导入模块,除了插件和Command本身,只有chain能响应事件 - try: - module = importlib.import_module( - f"app.chain.{class_name[:-5].lower()}" - ) - class_obj = getattr(module, class_name)() - except Exception as e: - logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") - continue - else: - # 通过类名创建类实例 - class_obj = globals()[class_name]() - # 检查类是否存在并调用方法 - if hasattr(class_obj, method_name): - self.threader.submit( - getattr(class_obj, method_name), - copy.deepcopy(event) - ) - except Exception as e: - logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") - self.messagehelper.put(title=f"{event.event_type} 事件处理出错", - message=f"{class_name}.{method_name}:{str(e)}", - role="system") - self.eventmanager.send_event( - EventType.SystemError, - { - "type": "event", - "event_type": event.event_type, - "event_handle": f"{class_name}.{method_name}", - "error": str(e), - "traceback": traceback.format_exc() - } - ) - def __run_command(self, command: Dict[str, any], data_str: str = "", channel: MessageChannel = None, source: str = None, userid: Union[str, int] = None): """ @@ -292,18 +216,6 @@ class Command(metaclass=Singleton): # 没有参数 command['func']() - def stop(self): - """ - 停止事件处理线程 - """ - logger.info("正在停止事件处理...") - self._event.set() - try: - self._thread.join() - logger.info("事件处理停止完成") - except Exception as e: - logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}") - def get_commands(self): """ 获取命令列表 @@ -361,7 +273,7 @@ class Command(metaclass=Singleton): """ 发送插件命令 """ - EventManager().send_event(etype, data) + eventmanager.send_event(etype, data) @eventmanager.register(EventType.CommandExcute) def command_event(self, event: ManagerEvent) -> None: diff --git a/app/core/event.py b/app/core/event.py index 03c3914b..e0c8cc97 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,10 +1,14 @@ +import copy +import importlib import inspect import threading import time +import traceback import uuid from queue import PriorityQueue, Empty from typing import Callable, Dict, List, Union, Optional +from app.helper.message import MessageHelper from app.helper.thread import ThreadHelper from app.log import logger from app.schemas.types import EventType, ChainEventType @@ -12,8 +16,7 @@ from app.utils.singleton import Singleton DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数 -MAX_EVENT_WORKER_POOL_SIZE = 50 # 最大事件工作线程池大小 -EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的超时时间(秒) +EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒) class Event: @@ -55,11 +58,13 @@ class EventManager(metaclass=Singleton): EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件 """ - def __init__(self, max_workers: int = MAX_EVENT_WORKER_POOL_SIZE): - """ - :param max_workers: 线程池最大工作线程数 - """ - self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 + # 退出事件 + _event = threading.Event() + + def __init__(self): + self.messagehelper = MessageHelper() + self.__executor = ThreadHelper() # 动态线程池,用于消费事件 + self.__consumer_threads = [] # 用于保存启动的事件消费者线程 self.__event_queue = PriorityQueue() # 优先级队列 self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者 self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者 @@ -68,9 +73,30 @@ class EventManager(metaclass=Singleton): self.__lock = threading.Lock() # 线程锁 self.__condition = threading.Condition(self.__lock) # 条件变量 + def start(self): + """ + 开始广播事件处理线程 + """ # 启动消费者线程用于处理广播事件 + self._event.set() for _ in range(MIN_EVENT_CONSUMER_THREADS): - threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() + thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True) + thread.start() + self.__consumer_threads.append(thread) # 将线程对象保存到列表中 + + def stop(self): + """ + 停止广播事件处理线程 + """ + logger.info("正在停止事件处理...") + self._event.clear() # 停止广播事件处理 + try: + # 通过遍历保存的线程来等待它们完成 + for consumer_thread in self.__consumer_threads: + consumer_thread.join() + logger.info("事件处理停止完成") + except Exception as e: + logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}") def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: @@ -250,14 +276,15 @@ class EventManager(metaclass=Singleton): :param event: 要调度的事件对象 """ handlers = self.__chain_subscribers.get(event.event_type, {}) + if not handlers: + return self.__log_event_lifecycle(event, "started") for handler_id, (priority, handler) in handlers.items(): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) " - f"completed in {time.time() - start_time:.3f}s") - + f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s" + ) self.__log_event_lifecycle(event, "completed") def __dispatch_broadcast_event(self, event: Event): @@ -266,41 +293,91 @@ class EventManager(metaclass=Singleton): :param event: 要调度的事件对象 """ handlers = self.__broadcast_subscribers.get(event.event_type, {}) + if not handlers: + return for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) def __safe_invoke_handler(self, handler: Callable, event: Event): """ - 安全调用事件处理器,捕获异常并记录日志 - :param handler: 要调用的处理器 + 调用处理器,处理链式或广播事件 + :param handler: 处理器 :param event: 事件对象 """ if not self.__is_handler_enabled(handler): - logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.") + logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution") return + + # 根据事件类型判断是否需要深复制 + is_broadcast_event = isinstance(event.event_type, EventType) + event_to_process = copy.deepcopy(event) if is_broadcast_event else event + + names = handler.__qualname__.split(".") + class_name, method_name = names[0], names[1] + try: - handler(event) + from app.core.plugin import PluginManager + + if class_name in PluginManager().get_plugin_ids(): + # 定义一个插件调用函数 + def plugin_callable(): + PluginManager().run_plugin_method(class_name, method_name, event_to_process) + + if is_broadcast_event: + self.__executor.submit(plugin_callable) + else: + plugin_callable() + else: + # 获取全局对象或模块类的实例 + class_obj = self.__get_class_instance(class_name) + if class_obj and hasattr(class_obj, method_name): + method = getattr(class_obj, method_name) + if is_broadcast_event: + self.__executor.submit(method, event_to_process) + else: + method(event_to_process) except Exception as e: self.__handle_event_error(event, handler, e) + @staticmethod + def __get_class_instance(class_name: str): + """ + 根据类名获取类实例,首先检查全局变量中是否存在该类,如果不存在则尝试动态导入模块。 + :param class_name: 类的名称 + :return: 类的实例 + """ + # 检查类是否在全局变量中 + if class_name in globals(): + class_obj = globals()[class_name]() + else: + # 如果类不在全局变量中,尝试动态导入模块并创建实例 + # 导入模块,除了插件和Command,只有chain能响应事件 + try: + module = importlib.import_module(f"app.chain.{class_name[:-5].lower()}") + class_obj = getattr(module, class_name)() + except Exception as e: + logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + return None + + return class_obj + def __fixed_broadcast_consumer(self): """ 固定的后台广播消费者线程,持续从队列中提取事件 """ - while True: + while not self._event.is_set(): # 使用 Condition 优化队列的等待机制,避免频繁触发超时 with self.__condition: # 当队列为空时,线程进入等待状态,直到有新事件到来 while self.__event_queue.empty(): # 阻塞等待,直到有事件插入 self.__condition.wait() - try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) logger.debug(f"Fixed consumer processing event: {event}") self.__dispatch_broadcast_event(event) except Empty: - logger.debug("Queue is empty, waiting for new events.") + logger.debug("Queue is empty, waiting for new events") @staticmethod def __log_event_lifecycle(event: Event, stage: str): @@ -309,15 +386,28 @@ class EventManager(metaclass=Singleton): """ logger.debug(f"{stage} - {event}") - @staticmethod - def __handle_event_error(event: Event, handler: Callable, error: Exception): + def __handle_event_error(self, event: Event, handler: Callable, e: Exception): """ 全局错误处理器,用于处理事件处理中的异常 """ - logger.error( - f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}") - # 可以将错误事件重新发送到事件队列或执行其他逻辑 - # eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id}) + logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}") + + names = handler.__qualname__.split(".") + class_name, method_name = names[0], names[1] + + self.messagehelper.put(title=f"{event.event_type} 事件处理出错", + message=f"{class_name}.{method_name}:{str(e)}", + role="system") + self.send_event( + EventType.SystemError, + { + "type": "event", + "event_type": event.event_type, + "event_handle": f"{class_name}.{method_name}", + "error": str(e), + "traceback": traceback.format_exc() + } + ) def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]): """ diff --git a/app/main.py b/app/main.py index 94d5bdf8..668dbea6 100644 --- a/app/main.py +++ b/app/main.py @@ -30,6 +30,7 @@ except ImportError as e: print(error_message, file=sys.stderr) sys.exit(1) +from app.core.event import EventManager from app.core.plugin import PluginManager from app.db.init import init_db, update_db from app.helper.thread import ThreadHelper @@ -212,7 +213,7 @@ def shutdown_server(): PluginManager().stop() PluginManager().stop_monitor() # 停止事件消费 - Command().stop() + EventManager().stop() # 停止虚拟显示 DisplayHelper().stop() # 停止定时服务 @@ -245,8 +246,10 @@ def start_module(): Monitor() # 启动定时服务 Scheduler() - # 启动事件消费 + # 加载命令 Command() + # 启动事件消费 + EventManager().start() # 初始化路由 init_routers() # 启动前端服务 From dea8fc5486d9ff741fe017967fad6f74bf296ba1 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 21:45:52 +0800 Subject: [PATCH 9/9] feat(event): optimized event execution flow --- app/core/event.py | 68 ++++++++++++++++++++++++++++------------------ app/core/plugin.py | 1 + app/main.py | 4 +-- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index e0c8cc97..c498f82c 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -43,6 +43,13 @@ class Event: event_kind = Event.get_event_kind(self.event_type) return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>" + def __lt__(self, other): + """ + 定义事件对象的比较规则,基于优先级比较 + 优先级小的事件会被认为“更小”,优先级高的事件将被认为“更大” + """ + return self.priority < other.priority + @staticmethod def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str: """ @@ -59,10 +66,10 @@ class EventManager(metaclass=Singleton): """ # 退出事件 - _event = threading.Event() + __event = threading.Event() def __init__(self): - self.messagehelper = MessageHelper() + self.__messagehelper = MessageHelper() self.__executor = ThreadHelper() # 动态线程池,用于消费事件 self.__consumer_threads = [] # 用于保存启动的事件消费者线程 self.__event_queue = PriorityQueue() # 优先级队列 @@ -78,7 +85,7 @@ class EventManager(metaclass=Singleton): 开始广播事件处理线程 """ # 启动消费者线程用于处理广播事件 - self._event.set() + self.__event.set() for _ in range(MIN_EVENT_CONSUMER_THREADS): thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True) thread.start() @@ -89,7 +96,7 @@ class EventManager(metaclass=Singleton): 停止广播事件处理线程 """ logger.info("正在停止事件处理...") - self._event.clear() # 停止广播事件处理 + self.__event.clear() # 停止广播事件处理 try: # 通过遍历保存的线程来等待它们完成 for consumer_thread in self.__consumer_threads: @@ -132,19 +139,28 @@ class EventManager(metaclass=Singleton): # 链式事件,按优先级排序 if event_type not in self.__chain_subscribers: self.__chain_subscribers[event_type] = {} - self.__chain_subscribers[event_type][handler_identifier] = (priority, handler) + handlers = self.__chain_subscribers[event_type] + if handler_identifier in handlers: + handlers.pop(handler_identifier) + else: + logger.debug( + f"Subscribed to chain event: {event_type.value}, " + f"Priority: {priority} - {handler_identifier}") + handlers[handler_identifier] = (priority, handler) # 根据优先级排序 self.__chain_subscribers[event_type] = dict( sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0]) ) - logger.debug( - f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") else: # 广播事件 if event_type not in self.__broadcast_subscribers: self.__broadcast_subscribers[event_type] = {} - self.__broadcast_subscribers[event_type][handler_identifier] = handler - logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") + handlers = self.__broadcast_subscribers[event_type] + if handler_identifier in handlers: + handlers.pop(handler_identifier) + else: + logger.debug(f"Subscribed to broadcast event: {event_type.value} - {handler_identifier}") + handlers[handler_identifier] = handler def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): """ @@ -157,10 +173,10 @@ class EventManager(metaclass=Singleton): if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: self.__chain_subscribers[event_type].pop(handler_identifier, None) - logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") + logger.debug(f"Unsubscribed from chain event: {event_type.value} - {handler_identifier}") elif event_type in self.__broadcast_subscribers: self.__broadcast_subscribers[event_type].pop(handler_identifier, None) - logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") + logger.debug(f"Unsubscribed from broadcast event: {event_type.value} - {handler_identifier}") def disable_event_handler(self, target: Union[Callable, type]): """ @@ -168,12 +184,14 @@ class EventManager(metaclass=Singleton): :param target: 处理器函数或类 """ identifier = self.__get_handler_identifier(target) + if identifier in self.__disabled_handlers or identifier in self.__disabled_classes: + return if isinstance(target, type): self.__disabled_classes.add(identifier) - logger.debug(f"Disabled event handler class: {identifier}") + logger.debug(f"Disabled event handler class - {identifier}") else: self.__disabled_handlers.add(identifier) - logger.debug(f"Disabled event handler: {identifier}") + logger.debug(f"Disabled event handler - {identifier}") def enable_event_handler(self, target: Union[Callable, type]): """ @@ -183,10 +201,10 @@ class EventManager(metaclass=Singleton): identifier = self.__get_handler_identifier(target) if isinstance(target, type): self.__disabled_classes.discard(identifier) - logger.debug(f"Enabled event handler class: {identifier}") + logger.debug(f"Enabled event handler class - {identifier}") else: self.__disabled_handlers.discard(identifier) - logger.debug(f"Enabled event handler: {identifier}") + logger.debug(f"Enabled event handler - {identifier}") def visualize_handlers(self) -> List[Dict]: """ @@ -283,7 +301,8 @@ class EventManager(metaclass=Singleton): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( - f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s" + f"Handler {self.__get_handler_identifier(handler)} (Priority: {priority}) ," + f" completed in {time.time() - start_time:.3f}s" ) self.__log_event_lifecycle(event, "completed") @@ -305,7 +324,7 @@ class EventManager(metaclass=Singleton): :param event: 事件对象 """ if not self.__is_handler_enabled(handler): - logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution") + logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution") return # 根据事件类型判断是否需要深复制 @@ -365,16 +384,13 @@ class EventManager(metaclass=Singleton): """ 固定的后台广播消费者线程,持续从队列中提取事件 """ - while not self._event.is_set(): + while self.__event.is_set(): # 使用 Condition 优化队列的等待机制,避免频繁触发超时 with self.__condition: - # 当队列为空时,线程进入等待状态,直到有新事件到来 - while self.__event_queue.empty(): - # 阻塞等待,直到有事件插入 - self.__condition.wait() + # 阻塞等待,直到有事件插入 + self.__condition.wait() try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) - logger.debug(f"Fixed consumer processing event: {event}") self.__dispatch_broadcast_event(event) except Empty: logger.debug("Queue is empty, waiting for new events") @@ -395,9 +411,9 @@ class EventManager(metaclass=Singleton): names = handler.__qualname__.split(".") class_name, method_name = names[0], names[1] - self.messagehelper.put(title=f"{event.event_type} 事件处理出错", - message=f"{class_name}.{method_name}:{str(e)}", - role="system") + self.__messagehelper.put(title=f"{event.event_type} 事件处理出错", + message=f"{class_name}.{method_name}:{str(e)}", + role="system") self.send_event( EventType.SystemError, { diff --git a/app/core/plugin.py b/app/core/plugin.py index ff365006..dc03ff99 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -205,6 +205,7 @@ class PluginManager(metaclass=Singleton): for plugin_id, plugin in self._running_plugins.items(): if pid and plugin_id != pid: continue + eventmanager.disable_event_handler(type(plugin)) self.__stop_plugin(plugin) # 清空对像 if pid: diff --git a/app/main.py b/app/main.py index 668dbea6..803c6094 100644 --- a/app/main.py +++ b/app/main.py @@ -238,6 +238,8 @@ def start_module(): ResourceHelper() # 加载模块 ModuleManager() + # 启动事件消费 + EventManager().start() # 安装在线插件 PluginManager().sync() # 加载插件 @@ -248,8 +250,6 @@ def start_module(): Scheduler() # 加载命令 Command() - # 启动事件消费 - EventManager().start() # 初始化路由 init_routers() # 启动前端服务