feat(event): optimized event execution flow

This commit is contained in:
InfinityPacer
2024-09-20 21:45:52 +08:00
parent 857383c8d0
commit dea8fc5486
3 changed files with 45 additions and 28 deletions

View File

@@ -43,6 +43,13 @@ class Event:
event_kind = Event.get_event_kind(self.event_type)
return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>"
def __lt__(self, other):
"""
定义事件对象的比较规则,基于优先级比较
优先级小的事件会被认为“更小”,优先级高的事件将被认为“更大”
"""
return self.priority < other.priority
@staticmethod
def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str:
"""
@@ -59,10 +66,10 @@ class EventManager(metaclass=Singleton):
"""
# 退出事件
_event = threading.Event()
__event = threading.Event()
def __init__(self):
self.messagehelper = MessageHelper()
self.__messagehelper = MessageHelper()
self.__executor = ThreadHelper() # 动态线程池,用于消费事件
self.__consumer_threads = [] # 用于保存启动的事件消费者线程
self.__event_queue = PriorityQueue() # 优先级队列
@@ -78,7 +85,7 @@ class EventManager(metaclass=Singleton):
开始广播事件处理线程
"""
# 启动消费者线程用于处理广播事件
self._event.set()
self.__event.set()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True)
thread.start()
@@ -89,7 +96,7 @@ class EventManager(metaclass=Singleton):
停止广播事件处理线程
"""
logger.info("正在停止事件处理...")
self._event.clear() # 停止广播事件处理
self.__event.clear() # 停止广播事件处理
try:
# 通过遍历保存的线程来等待它们完成
for consumer_thread in self.__consumer_threads:
@@ -132,19 +139,28 @@ class EventManager(metaclass=Singleton):
# 链式事件,按优先级排序
if event_type not in self.__chain_subscribers:
self.__chain_subscribers[event_type] = {}
self.__chain_subscribers[event_type][handler_identifier] = (priority, handler)
handlers = self.__chain_subscribers[event_type]
if handler_identifier in handlers:
handlers.pop(handler_identifier)
else:
logger.debug(
f"Subscribed to chain event: {event_type.value}, "
f"Priority: {priority} - {handler_identifier}")
handlers[handler_identifier] = (priority, handler)
# 根据优先级排序
self.__chain_subscribers[event_type] = dict(
sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0])
)
logger.debug(
f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}")
else:
# 广播事件
if event_type not in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type] = {}
self.__broadcast_subscribers[event_type][handler_identifier] = handler
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
handlers = self.__broadcast_subscribers[event_type]
if handler_identifier in handlers:
handlers.pop(handler_identifier)
else:
logger.debug(f"Subscribed to broadcast event: {event_type.value} - {handler_identifier}")
handlers[handler_identifier] = handler
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
"""
@@ -157,10 +173,10 @@ class EventManager(metaclass=Singleton):
if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers:
self.__chain_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}")
logger.debug(f"Unsubscribed from chain event: {event_type.value} - {handler_identifier}")
elif event_type in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
logger.debug(f"Unsubscribed from broadcast event: {event_type.value} - {handler_identifier}")
def disable_event_handler(self, target: Union[Callable, type]):
"""
@@ -168,12 +184,14 @@ class EventManager(metaclass=Singleton):
:param target: 处理器函数或类
"""
identifier = self.__get_handler_identifier(target)
if identifier in self.__disabled_handlers or identifier in self.__disabled_classes:
return
if isinstance(target, type):
self.__disabled_classes.add(identifier)
logger.debug(f"Disabled event handler class: {identifier}")
logger.debug(f"Disabled event handler class - {identifier}")
else:
self.__disabled_handlers.add(identifier)
logger.debug(f"Disabled event handler: {identifier}")
logger.debug(f"Disabled event handler - {identifier}")
def enable_event_handler(self, target: Union[Callable, type]):
"""
@@ -183,10 +201,10 @@ class EventManager(metaclass=Singleton):
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.discard(identifier)
logger.debug(f"Enabled event handler class: {identifier}")
logger.debug(f"Enabled event handler class - {identifier}")
else:
self.__disabled_handlers.discard(identifier)
logger.debug(f"Enabled event handler: {identifier}")
logger.debug(f"Enabled event handler - {identifier}")
def visualize_handlers(self) -> List[Dict]:
"""
@@ -283,7 +301,8 @@ class EventManager(metaclass=Singleton):
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s"
f"Handler {self.__get_handler_identifier(handler)} (Priority: {priority}) ,"
f" completed in {time.time() - start_time:.3f}s"
)
self.__log_event_lifecycle(event, "completed")
@@ -305,7 +324,7 @@ class EventManager(metaclass=Singleton):
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution")
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
# 根据事件类型判断是否需要深复制
@@ -365,16 +384,13 @@ class EventManager(metaclass=Singleton):
"""
固定的后台广播消费者线程,持续从队列中提取事件
"""
while not self._event.is_set():
while self.__event.is_set():
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
with self.__condition:
# 当队列为空时,线程进入等待状态,直到有事件到来
while self.__event_queue.empty():
# 阻塞等待,直到有事件插入
self.__condition.wait()
# 阻塞等待,直到有事件插入
self.__condition.wait()
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_broadcast_event(event)
except Empty:
logger.debug("Queue is empty, waiting for new events")
@@ -395,9 +411,9 @@ class EventManager(metaclass=Singleton):
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
self.messagehelper.put(title=f"{event.event_type} 事件处理出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.__messagehelper.put(title=f"{event.event_type} 事件处理出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.send_event(
EventType.SystemError,
{

View File

@@ -205,6 +205,7 @@ class PluginManager(metaclass=Singleton):
for plugin_id, plugin in self._running_plugins.items():
if pid and plugin_id != pid:
continue
eventmanager.disable_event_handler(type(plugin))
self.__stop_plugin(plugin)
# 清空对像
if pid:

View File

@@ -238,6 +238,8 @@ def start_module():
ResourceHelper()
# 加载模块
ModuleManager()
# 启动事件消费
EventManager().start()
# 安装在线插件
PluginManager().sync()
# 加载插件
@@ -248,8 +250,6 @@ def start_module():
Scheduler()
# 加载命令
Command()
# 启动事件消费
EventManager().start()
# 初始化路由
init_routers()
# 启动前端服务