From 688693b31f576aa0a4aaaf2ec0cb8f93986e8e0b Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 20 Sep 2024 18:42:29 +0800 Subject: [PATCH] feat(event): use dict for subscribers and replace handler if exists --- app/core/event.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index 157d14bc..03c3914b 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -61,8 +61,8 @@ class EventManager(metaclass=Singleton): """ self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件 self.__event_queue = PriorityQueue() # 优先级队列 - self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者 - self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者 + self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者 + self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者 self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 @@ -97,22 +97,27 @@ class EventManager(metaclass=Singleton): 注册事件处理器,将处理器添加到对应的事件订阅列表中 :param event_type: 事件类型 (EventType 或 ChainEventType) :param handler: 处理器 - :param priority: 可选,事件的优先级,默认为 10 + :param priority: 可选,链式事件的优先级,默认为 10;广播事件不需要优先级 """ with self.__lock: + handler_identifier = self.__get_handler_identifier(handler) + if isinstance(event_type, ChainEventType): # 链式事件,按优先级排序 if event_type not in self.__chain_subscribers: - self.__chain_subscribers[event_type] = [] - self.__chain_subscribers[event_type].append((priority, handler)) - self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 按优先级排序 + self.__chain_subscribers[event_type] = {} + self.__chain_subscribers[event_type][handler_identifier] = (priority, handler) + # 根据优先级排序 + self.__chain_subscribers[event_type] = dict( + sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0]) + ) logger.debug( f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}") else: # 广播事件 if event_type not in self.__broadcast_subscribers: - self.__broadcast_subscribers[event_type] = [] - self.__broadcast_subscribers[event_type].append(handler) + self.__broadcast_subscribers[event_type] = {} + self.__broadcast_subscribers[event_type][handler_identifier] = handler logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}") def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable): @@ -122,12 +127,13 @@ class EventManager(metaclass=Singleton): :param handler: 要移除的处理器 """ with self.__lock: + handler_identifier = self.__get_handler_identifier(handler) + if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers: - self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if - h[1] != handler] + self.__chain_subscribers[event_type].pop(handler_identifier, None) logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}") elif event_type in self.__broadcast_subscribers: - self.__broadcast_subscribers[event_type].remove(handler) + self.__broadcast_subscribers[event_type].pop(handler_identifier, None) logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}") def disable_event_handler(self, target: Union[Callable, type]): @@ -243,9 +249,9 @@ class EventManager(metaclass=Singleton): 同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间 :param event: 要调度的事件对象 """ - handlers = self.__chain_subscribers.get(event.event_type, []) + handlers = self.__chain_subscribers.get(event.event_type, {}) self.__log_event_lifecycle(event, "started") - for priority, handler in handlers: + for handler_id, (priority, handler) in handlers.items(): start_time = time.time() self.__safe_invoke_handler(handler, event) logger.debug( @@ -259,8 +265,8 @@ class EventManager(metaclass=Singleton): 异步方式调度广播事件,通过线程池逐个调用事件处理器 :param event: 要调度的事件对象 """ - handlers = self.__broadcast_subscribers.get(event.event_type, []) - for handler in handlers: + handlers = self.__broadcast_subscribers.get(event.event_type, {}) + for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) def __safe_invoke_handler(self, handler: Callable, event: Event):