From 3bee5a8a866786a45392c906bbd4443b96d4fc10 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 13:52:09 +0800 Subject: [PATCH] feat(event): separate implementation of broadcast and chain --- app/core/event.py | 196 +++++++++++++++++++++++++++---------------- app/schemas/types.py | 7 +- 2 files changed, 127 insertions(+), 76 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index d58319fb..a2f93e8f 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,12 +1,12 @@ import threading +import time import uuid -from enum import Enum from queue import PriorityQueue, Empty from typing import Callable, Dict, List, Union, Optional from app.helper.thread import ThreadHelper from app.log import logger -from app.schemas.types import EventType, SyncEventType +from app.schemas.types import EventType, ChainEventType from app.utils.singleton import Singleton DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 @@ -20,12 +20,12 @@ class Event: 事件类,封装事件的基本信息 """ - def __init__(self, event_type: Union[EventType, SyncEventType], event_data: Optional[Dict] = None, - priority: int = 10): + def __init__(self, event_type: Union[EventType, ChainEventType], event_data: Optional[Dict] = None, + priority: int = DEFAULT_EVENT_PRIORITY): """ - :param event_type: 事件的类型,支持 EventType 或 SyncEventType + :param event_type: 事件的类型,支持 EventType 或 ChainEventType :param event_data: 可选,事件携带的数据,默认为空字典 - :param priority: 可选,广播事件的优先级,默认为 10 + :param priority: 可选,事件的优先级,默认为 10 """ self.event_id = str(uuid.uuid4()) # 事件ID self.event_type = event_type # 事件类型 @@ -40,10 +40,10 @@ class Event: return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>" @staticmethod - def get_event_kind(event_type: Union[EventType, SyncEventType]) -> str: + def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str: """ 根据事件类型判断事件是广播事件还是链式事件 - :param event_type: 事件类型,支持 EventType 或 SyncEventType + :param event_type: 事件类型,支持 EventType 或 ChainEventType :return: 返回 Broadcast Event 或 Chain Event """ return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event" @@ -60,60 +60,75 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表 + self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ + ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器) self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 self.__condition = threading.Condition(self.__lock) # 条件变量 - # 启动消费者线程用于处理异步事件 + # 启动消费者线程用于处理广播事件 for _ in range(MIN_EVENT_CONSUMER_THREADS): - threading.Thread(target=self.__fixed_consumer, daemon=True).start() + threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start() - def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, + def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]: """ 发送事件,根据事件类型决定是广播事件还是链式事件 - :param etype: 事件类型 (EventType 或 SyncEventType) + :param etype: 事件类型 (EventType 或 ChainEventType) :param data: 可选,事件数据 :param priority: 广播事件的优先级,默认为 10 :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ event = Event(etype, data, priority) if isinstance(etype, EventType): - self.__trigger_event_async(event) + self.__trigger_broadcast_event(event) with self.__condition: self.__condition.notify() - elif isinstance(etype, SyncEventType): - return self.__trigger_event(event) + elif isinstance(etype, ChainEventType): + return self.__trigger_chain_event(event) else: logger.error(f"Unknown event type: {etype}") - def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]): + def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None], + priority: int = DEFAULT_EVENT_PRIORITY): """ 注册事件处理器,将处理器添加到对应的事件订阅列表中 - :param event_type: 事件类型 (EventType 或 SyncEventType) + :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 处理器 + :param priority: 可选,事件的优先级,默认为 10 """ with self.__lock: - if event_type not in self.__subscribers: - self.__subscribers[event_type] = [] - self.__subscribers[event_type].append(handler) - event_kind = Event.get_event_kind(event_type) - logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") + if isinstance(event_type, ChainEventType): + # 链式事件,按优先级排序 + if event_type not in self.__chain_subscribers: + self.__chain_subscribers[event_type] = [] + self.__chain_subscribers[event_type].append((priority, handler)) + self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 按优先级排序 + logger.debug( + f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") + else: + # 广播事件 + if event_type not in self.__broadcast_subscribers: + self.__broadcast_subscribers[event_type] = [] + self.__broadcast_subscribers[event_type].append(handler) + logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") - def remove_event_listener(self, event_type: Union[EventType, SyncEventType], - handler: Callable[[Dict], None]): + def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]): """ 移除事件处理器,将处理器从对应事件的订阅列表中删除 - :param event_type: 事件类型 (EventType 或 SyncEventType) + :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 要移除的处理器 """ with self.__lock: - if event_type in self.__subscribers: - self.__subscribers[event_type].remove(handler) - event_kind = Event.get_event_kind(event_type) - logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}") + if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: + self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if + h[1] != handler] + logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") + elif event_type in self.__broadcast_subscribers: + self.__broadcast_subscribers[event_type].remove(handler) + logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None): """ @@ -141,20 +156,26 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers.discard(handler_name) logger.debug(f"Enabled event handler: {handler_name}") - def check(self, etype: Union[EventType, SyncEventType]) -> bool: + def check(self, etype: Union[EventType, ChainEventType]) -> bool: """ 检查是否有启用的事件处理器可以响应某个事件类型 - :param etype: 事件类型 (EventType 或 SyncEventType) + :param etype: 事件类型 (EventType 或 ChainEventType) :return: 返回是否存在可用的处理器 """ - if etype not in self.__subscribers: - return False - handlers = self.__subscribers.get(etype, []) - return any( - handler.__name__ not in self.__disabled_handlers and - handler.__qualname__.split(".")[0] not in self.__disabled_classes - for handler in handlers - ) + if isinstance(etype, ChainEventType): + handlers = self.__chain_subscribers.get(etype, []) + return any( + handler.__name__ not in self.__disabled_handlers and + handler.__qualname__.split(".")[0] not in self.__disabled_classes + for _, handler in handlers + ) + else: + handlers = self.__broadcast_subscribers.get(etype, []) + return any( + handler.__name__ not in self.__disabled_handlers and + handler.__qualname__.split(".")[0] not in self.__disabled_classes + for handler in handlers + ) def visualize_handlers(self) -> List[Dict[str, str]]: """ @@ -162,14 +183,12 @@ class EventManager(metaclass=Singleton): :return: 处理器列表,包含处理器名称、类名和状态 """ handler_info = [] - with (self.__lock): - for event_type, handlers in self.__subscribers.items(): + with self.__lock: + for event_type, handlers in self.__broadcast_subscribers.items(): for handler in handlers: class_name = handler.__qualname__.split(".")[0] status = ( - "disabled" - if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes - else "enabled" + "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" ) handler_info.append({ "event_type": event_type.value, @@ -177,53 +196,71 @@ class EventManager(metaclass=Singleton): "class_name": class_name, "status": status }) + for event_type, handlers in self.__chain_subscribers.items(): + for priority, handler in handlers: + class_name = handler.__qualname__.split(".")[0] + status = ( + "disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled" + ) + handler_info.append({ + "event_type": event_type.value, + "handler_name": handler.__name__, + "class_name": class_name, + "priority": priority, + "status": status + }) return handler_info - def __trigger_event(self, event: Event) -> Dict: + def __trigger_chain_event(self, event: Event) -> Dict: """ 触发链式事件,按顺序调用订阅的处理器 :param event: 要处理的事件对象 :return: 返回处理后的事件数据 """ logger.debug(f"Triggering synchronous chain event: {event}") - self.__dispatch_event(event) + self.__dispatch_chain_event(event) return event.event_data - def __trigger_event_async(self, event: Event): + def __trigger_broadcast_event(self, event: Event): """ 触发广播事件,将事件插入到优先级队列中 :param event: 要处理的事件对象 """ - logger.debug(f"Triggering asynchronous broadcast event: {event}") + logger.debug(f"Triggering broadcast event: {event}") self.__event_queue.put((event.priority, event)) - def __dispatch_event(self, event: Event): + def __dispatch_chain_event(self, event: Event): """ - 同步方式调度事件,逐个调用事件处理器 + 同步方式调度链式事件,按优先级顺序逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__subscribers.get(event.event_type, []) - for handler in handlers: + handlers = self.__chain_subscribers.get(event.event_type, []) + self.__log_event_lifecycle(event, "started") + for priority, handler in handlers: class_name = handler.__qualname__.split(".")[0] if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: + start_time = time.time() try: handler(event.event_data) + logger.debug( + f"Handler {handler.__qualname__} (Priority: {priority}) " + f"completed in {time.time() - start_time:.3f}s") except Exception as e: - logger.error(f"Error handling event {event.event_type}: {str(e)}", exc_info=True) + self.__handle_event_error(event, handler, e) + self.__log_event_lifecycle(event, "completed") - def __dispatch_event_async(self, event: Event): + def __dispatch_broadcast_event(self, event: Event): """ - 异步方式调度事件,通过线程池逐个调用事件处理器 + 异步方式调度广播事件,通过线程池逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__subscribers.get(event.event_type, []) + handlers = self.__broadcast_subscribers.get(event.event_type, []) for handler in handlers: class_name = handler.__qualname__.split(".")[0] if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes: self.__executor.submit(self.__safe_invoke_handler, handler, event) - @staticmethod - def __safe_invoke_handler(handler: Callable[[Dict], None], event: Event): + def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event): """ 安全调用事件处理器,捕获异常并记录日志 :param handler: 要调用的处理器 @@ -232,11 +269,11 @@ class EventManager(metaclass=Singleton): try: handler(event.event_data) except Exception as e: - logger.error(f"Error in asynchronous handler {handler.__name__}: {str(e)}", exc_info=True) + self.__handle_event_error(event, handler, e) - def __fixed_consumer(self): + def __fixed_broadcast_consumer(self): """ - 固定的后台消费者线程,持续从队列中提取事件 + 固定的后台广播消费者线程,持续从队列中提取事件 """ while True: # 使用 Condition 优化队列的等待机制,避免频繁触发超时 @@ -249,16 +286,33 @@ class EventManager(metaclass=Singleton): try: priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) logger.debug(f"Fixed consumer processing event: {event}") - self.__dispatch_event_async(event) + self.__dispatch_broadcast_event(event) except Empty: logger.debug("Queue is empty, waiting for new events.") - def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]): + @staticmethod + def __log_event_lifecycle(event: Event, stage: str): + """ + 记录事件的生命周期日志 + """ + logger.debug(f"{stage} - {event}") + + @staticmethod + def __handle_event_error(event: Event, handler: Callable, error: Exception): + """ + 全局错误处理器,用于处理事件处理中的异常 + """ + logger.error( + f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}") + # 可以将错误事件重新发送到事件队列或执行其他逻辑 + # eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id}) + + def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]): """ 事件注册装饰器,用于将函数注册为事件的处理器 :param etype: - - 单个事件类型成员 (如 EventType.MetadataScrape, SyncEventType.PluginAction) - - 事件类型类 (EventType, SyncEventType) + - 单个事件类型成员 (如 EventType.MetadataScrape, ChainEventType.PluginAction) + - 事件类型类 (EventType, ChainEventType) - 或事件类型成员的列表 """ @@ -268,17 +322,17 @@ class EventManager(metaclass=Singleton): # 如果传入的是列表,处理每个事件类型 if isinstance(etype, list): for et in etype: - if isinstance(et, (EventType, SyncEventType)): + if isinstance(et, (EventType, ChainEventType)): event_list.append(et) else: raise ValueError(f"列表中无效的事件类型: {et}") - # 如果传入的是 EventType 或 SyncEventType 类,提取该类中的所有成员 - elif isinstance(etype, type) and issubclass(etype, Enum): + # 如果传入的是 EventType 或 ChainEventType 类,提取该类中的所有成员 + elif isinstance(etype, type) and issubclass(etype, (EventType, ChainEventType)): event_list.extend(etype.__members__.values()) - # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 SyncEventType.PluginAction) - elif isinstance(etype, (EventType, SyncEventType)): + # 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 ChainEventType.PluginAction) + elif isinstance(etype, (EventType, ChainEventType)): event_list.append(etype) else: diff --git a/app/schemas/types.py b/app/schemas/types.py index 87dd8815..5dc30960 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -57,11 +57,8 @@ class EventType(Enum): # 同步链式事件 -class SyncEventType(Enum): - # 刮削元数据 - MetadataScrape = "metadata.scrape" - # 插件动作 - PluginAction = "plugin.action" +class ChainEventType(Enum): + pass # 系统配置Key字典