feat(event): use dict for subscribers and replace handler if exists

This commit is contained in:
InfinityPacer
2024-09-20 18:42:29 +08:00
parent ef0768ec44
commit 688693b31f

View File

@@ -61,8 +61,8 @@ class EventManager(metaclass=Singleton):
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
@@ -97,22 +97,27 @@ class EventManager(metaclass=Singleton):
注册事件处理器,将处理器添加到对应的事件订阅列表中
:param event_type: 事件类型 (EventType 或 ChainEventType)
:param handler: 处理器
:param priority: 可选,事件的优先级,默认为 10
:param priority: 可选,链式事件的优先级,默认为 10;广播事件不需要优先级
"""
with self.__lock:
handler_identifier = self.__get_handler_identifier(handler)
if isinstance(event_type, ChainEventType):
# 链式事件,按优先级排序
if event_type not in self.__chain_subscribers:
self.__chain_subscribers[event_type] = []
self.__chain_subscribers[event_type].append((priority, handler))
self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 优先级排序
self.__chain_subscribers[event_type] = {}
self.__chain_subscribers[event_type][handler_identifier] = (priority, handler)
# 根据优先级排序
self.__chain_subscribers[event_type] = dict(
sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0])
)
logger.debug(
f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}")
else:
# 广播事件
if event_type not in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type] = []
self.__broadcast_subscribers[event_type].append(handler)
self.__broadcast_subscribers[event_type] = {}
self.__broadcast_subscribers[event_type][handler_identifier] = handler
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
@@ -122,12 +127,13 @@ class EventManager(metaclass=Singleton):
:param handler: 要移除的处理器
"""
with self.__lock:
handler_identifier = self.__get_handler_identifier(handler)
if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers:
self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if
h[1] != handler]
self.__chain_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}")
elif event_type in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type].remove(handler)
self.__broadcast_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
def disable_event_handler(self, target: Union[Callable, type]):
@@ -243,9 +249,9 @@ class EventManager(metaclass=Singleton):
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, [])
handlers = self.__chain_subscribers.get(event.event_type, {})
self.__log_event_lifecycle(event, "started")
for priority, handler in handlers:
for handler_id, (priority, handler) in handlers.items():
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
@@ -259,8 +265,8 @@ class EventManager(metaclass=Singleton):
异步方式调度广播事件,通过线程池逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__broadcast_subscribers.get(event.event_type, [])
for handler in handlers:
handlers = self.__broadcast_subscribers.get(event.event_type, {})
for handler_id, handler in handlers.items():
self.__executor.submit(self.__safe_invoke_handler, handler, event)
def __safe_invoke_handler(self, handler: Callable, event: Event):