feat:键式事件协程处理

This commit is contained in:
jxxghp
2025-07-31 17:27:15 +08:00
parent 1e61e60d73
commit aea44c1d97
5 changed files with 251 additions and 41 deletions

View File

@@ -7,7 +7,7 @@ import time
import traceback
import uuid
from queue import Empty, PriorityQueue
from typing import Callable, Dict, List, Optional, Union
from typing import Callable, Dict, List, Optional, Tuple, Union
from app.helper.thread import ThreadHelper
from app.log import logger
@@ -142,6 +142,25 @@ class EventManager(metaclass=Singleton):
logger.error(f"Unknown event type: {etype}")
return None
async def async_send_event(self, etype: Union[EventType, ChainEventType],
data: Optional[Union[Dict, ChainEventData]] = None,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
"""
异步发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType)
:param data: 可选,事件数据
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
event = Event(etype, data, priority)
if isinstance(etype, EventType):
return self.__trigger_broadcast_event(event)
elif isinstance(etype, ChainEventType):
return await self.__trigger_chain_event_async(event)
else:
logger.error(f"Unknown event type: {etype}")
return None
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY):
"""
@@ -325,6 +344,14 @@ class EventManager(metaclass=Singleton):
dispatch = self.__dispatch_chain_event(event)
return event if dispatch else None
async def __trigger_chain_event_async(self, event: Event) -> Optional[Event]:
"""
异步触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
"""
logger.debug(f"Triggering asynchronous chain event: {event}")
dispatch = await self.__dispatch_chain_event_async(event)
return event if dispatch else None
def __trigger_broadcast_event(self, event: Event):
"""
触发广播事件,将事件插入到优先级队列中
@@ -362,6 +389,35 @@ class EventManager(metaclass=Singleton):
self.__log_event_lifecycle(event, "Completed")
return True
async def __dispatch_chain_event_async(self, event: Event) -> bool:
"""
异步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, {})
if not handlers:
logger.debug(f"No handlers found for chain event: {event}")
return False
# 过滤出启用的处理器
enabled_handlers = {handler_id: (priority, handler) for handler_id, (priority, handler) in handlers.items()
if self.__is_handler_enabled(handler)}
if not enabled_handlers:
logger.debug(f"No enabled handlers found for chain event: {event}. Skipping execution.")
return False
self.__log_event_lifecycle(event, "Started")
for handler_id, (priority, handler) in enabled_handlers.items():
start_time = time.time()
await self.__safe_invoke_handler_async(handler, event)
logger.debug(
f"{self.__get_handler_identifier(handler)} (Priority: {priority}), "
f"completed in {time.time() - start_time:.3f}s for event: {event}"
)
self.__log_event_lifecycle(event, "Completed")
return True
def __dispatch_broadcast_event(self, event: Event):
"""
异步方式调度广播事件,通过线程池逐个调用事件处理器
@@ -388,45 +444,178 @@ class EventManager(metaclass=Singleton):
is_broadcast_event = isinstance(event.event_type, EventType)
event_to_process = copy.deepcopy(event) if is_broadcast_event else event
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
try:
from app.core.plugin import PluginManager
from app.core.module import ModuleManager
if class_name in PluginManager().get_plugin_ids():
def plugin_callable():
"""
插件调用函数
"""
PluginManager().run_plugin_method(class_name, method_name, event_to_process)
if is_broadcast_event:
self.__executor.submit(plugin_callable)
else:
plugin_callable()
elif class_name in ModuleManager().get_module_ids():
module = ModuleManager().get_running_module(class_name)
if module:
method = getattr(module, method_name, None)
if method:
if is_broadcast_event:
self.__executor.submit(method, event_to_process)
else:
method(event_to_process)
else:
# 获取全局对象或模块类的实例
class_obj = self.__get_class_instance(class_name)
if class_obj and hasattr(class_obj, method_name):
method = getattr(class_obj, method_name)
if is_broadcast_event:
self.__executor.submit(method, event_to_process)
else:
method(event_to_process)
self.__invoke_handler_by_type_sync(handler, event_to_process, is_broadcast_event)
except Exception as e:
self.__handle_event_error(event, handler, e)
async def __safe_invoke_handler_async(self, handler: Callable, event: Event):
"""
异步调用处理器,处理链式事件
:param handler: 处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
# 链式事件不需要深复制
event_to_process = event
try:
await self.__invoke_handler_by_type_async(handler, event_to_process)
except Exception as e:
self.__handle_event_error(event, handler, e)
def __invoke_handler_by_type_sync(self, handler: Callable, event_to_process: Event, is_broadcast_event: bool):
"""
同步方式根据处理器类型调用相应的方法
:param handler: 处理器
:param event_to_process: 要处理的事件对象
:param is_broadcast_event: 是否为广播事件
"""
class_name, method_name = self.__parse_handler_names(handler)
from app.core.plugin import PluginManager
from app.core.module import ModuleManager
if class_name in PluginManager().get_plugin_ids():
self.__invoke_plugin_method_sync(class_name, method_name, event_to_process, is_broadcast_event)
elif class_name in ModuleManager().get_module_ids():
self.__invoke_module_method_sync(class_name, method_name, event_to_process, is_broadcast_event)
else:
self.__invoke_global_method_sync(class_name, method_name, event_to_process, is_broadcast_event)
async def __invoke_handler_by_type_async(self, handler: Callable, event_to_process: Event):
"""
异步方式根据处理器类型调用相应的方法
:param handler: 处理器
:param event_to_process: 要处理的事件对象
"""
class_name, method_name = self.__parse_handler_names(handler)
from app.core.plugin import PluginManager
from app.core.module import ModuleManager
if class_name in PluginManager().get_plugin_ids():
await self.__invoke_plugin_method_async(class_name, method_name, event_to_process)
elif class_name in ModuleManager().get_module_ids():
await self.__invoke_module_method_async(class_name, method_name, event_to_process)
else:
await self.__invoke_global_method_async(class_name, method_name, event_to_process)
@staticmethod
def __parse_handler_names(handler: Callable) -> Tuple[str, str]:
"""
解析处理器的类名和方法名
:param handler: 处理器
:return: (class_name, method_name)
"""
names = handler.__qualname__.split(".")
return names[0], names[1]
def __invoke_plugin_method_sync(self, class_name: str, method_name: str, event_to_process: Event,
is_broadcast_event: bool):
"""
同步调用插件方法
"""
from app.core.plugin import PluginManager
def plugin_callable():
PluginManager().run_plugin_method(class_name, method_name, event_to_process)
if is_broadcast_event:
self.__executor.submit(plugin_callable)
else:
plugin_callable()
@staticmethod
async def __invoke_plugin_method_async(class_name: str, method_name: str, event_to_process: Event):
"""
异步调用插件方法
"""
from app.core.plugin import PluginManager
plugin_manager = PluginManager()
plugin = plugin_manager.running_plugins.get(class_name)
if plugin and hasattr(plugin, method_name):
method = getattr(plugin, method_name)
if inspect.iscoroutinefunction(method):
await method(event_to_process)
else:
plugin_manager.run_plugin_method(class_name, method_name, event_to_process)
def __invoke_module_method_sync(self, class_name: str, method_name: str, event_to_process: Event,
is_broadcast_event: bool):
"""
同步调用模块方法
"""
from app.core.module import ModuleManager
module = ModuleManager().get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
if is_broadcast_event:
self.__executor.submit(method, event_to_process)
else:
method(event_to_process)
@staticmethod
async def __invoke_module_method_async(class_name: str, method_name: str, event_to_process: Event):
"""
异步调用模块方法
"""
from app.core.module import ModuleManager
module = ModuleManager().get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
if inspect.iscoroutinefunction(method):
await method(event_to_process)
else:
method(event_to_process)
def __invoke_global_method_sync(self, class_name: str, method_name: str, event_to_process: Event,
is_broadcast_event: bool):
"""
同步调用全局对象方法
"""
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
return
method = getattr(class_obj, method_name)
if is_broadcast_event:
self.__executor.submit(method, event_to_process)
else:
method(event_to_process)
async def __invoke_global_method_async(self, class_name: str, method_name: str, event_to_process: Event):
"""
异步调用全局对象方法
"""
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
return
method = getattr(class_obj, method_name)
if inspect.iscoroutinefunction(method):
await method(event_to_process)
else:
method(event_to_process)
@staticmethod
def __get_class_instance(class_name: str):
"""

View File

@@ -1,3 +1,4 @@
import asyncio
import concurrent
import concurrent.futures
import importlib.util
@@ -831,6 +832,25 @@ class PluginManager(metaclass=Singleton):
return None
return getattr(plugin, method)(*args, **kwargs)
async def async_run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any:
"""
异步运行插件方法
:param pid: 插件ID
:param method: 方法名
:param args: 参数
:param kwargs: 关键字参数
"""
plugin = self._running_plugins.get(pid)
if not plugin:
return None
if not hasattr(plugin, method):
return None
method_func = getattr(plugin, method)
if asyncio.iscoroutinefunction(method_func):
return await method_func(*args, **kwargs)
else:
return method_func(*args, **kwargs)
def get_plugin_ids(self) -> List[str]:
"""
获取所有插件ID