Files
MoviePilot/app/core/event.py
2025-11-20 08:15:37 +08:00

759 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import importlib
import inspect
import random
import threading
import time
import traceback
import uuid
from queue import Empty, PriorityQueue
from typing import Callable, Dict, List, Optional, Tuple, Union, Any
from fastapi.concurrency import run_in_threadpool
from app.core.config import global_vars
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas import ChainEventData
from app.schemas.types import ChainEventType, EventType
from app.utils.limit import ExponentialBackoffRateLimiter
from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 5 # 事件队列空闲时的最大超时时间(秒)
class Event:
"""
事件类,封装事件的基本信息
"""
def __init__(self, event_type: Union[EventType, ChainEventType],
event_data: Optional[Union[Dict, ChainEventData]] = None,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY):
"""
:param event_type: 事件的类型,支持 EventType 或 ChainEventType
:param event_data: 可选,事件携带的数据,默认为空字典
:param priority: 可选,事件的优先级,默认为 10
"""
self.event_id = str(uuid.uuid4()) # 事件ID
self.event_type = event_type # 事件类型
self.event_data = event_data or {} # 事件数据
self.priority = priority # 事件优先级
def __repr__(self) -> str:
"""
重写 __repr__ 方法用于返回事件的详细信息包括事件类型、事件ID和优先级
"""
event_kind = Event.get_event_kind(self.event_type)
return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>"
def __lt__(self, other):
"""
定义事件对象的比较规则,基于优先级比较
优先级小的事件会被认为“更小”,优先级高的事件将被认为“更大”
"""
return self.priority < other.priority
@staticmethod
def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str:
"""
根据事件类型判断事件是广播事件还是链式事件
:param event_type: 事件类型,支持 EventType 或 ChainEventType
:return: 返回 Broadcast Event 或 Chain Event
"""
return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event"
class EventManager(metaclass=Singleton):
"""
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
"""
def __init__(self):
# 动态线程池,用于消费事件
self.__executor = ThreadHelper()
# 用于保存启动的事件消费者线程
self.__consumer_threads = []
# 优先级队列
self.__event_queue = PriorityQueue()
# 广播事件的订阅者
self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {}
# 链式事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {}
# 禁用的事件处理器集合
self.__disabled_handlers = set()
# 禁用的事件处理器类集合
self.__disabled_classes = set()
# 线程锁
self.__lock = threading.Lock()
# 退出事件
self.__event = threading.Event()
def start(self):
"""
开始广播事件处理线程
"""
# 启动消费者线程用于处理广播事件
self.__event.set()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
thread = threading.Thread(target=self.__broadcast_consumer_loop, daemon=True)
thread.start()
self.__consumer_threads.append(thread) # 将线程对象保存到列表中
def stop(self):
"""
停止广播事件处理线程
"""
logger.info("正在停止事件处理...")
self.__event.clear() # 停止广播事件处理
try:
# 通过遍历保存的线程来等待它们完成
for consumer_thread in self.__consumer_threads:
consumer_thread.join()
logger.info("事件处理停止完成")
except Exception as e:
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
"""
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, {})
return any(
self.__is_handler_enabled(handler)
for _, handler in handlers.values()
)
else:
handlers = self.__broadcast_subscribers.get(etype, {})
return any(
self.__is_handler_enabled(handler)
for handler in handlers.values()
)
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Union[Dict, ChainEventData]] = None,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
"""
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType)
:param data: 可选,事件数据
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
event = Event(etype, data, priority)
if isinstance(etype, EventType):
return self.__trigger_broadcast_event(event)
elif isinstance(etype, ChainEventType):
return self.__trigger_chain_event(event)
else:
logger.error(f"Unknown event type: {etype}")
return None
async def async_send_event(self, etype: Union[EventType, ChainEventType],
data: Optional[Union[Dict, ChainEventData]] = None,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
"""
异步发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType)
:param data: 可选,事件数据
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
event = Event(etype, data, priority)
if isinstance(etype, EventType):
return self.__trigger_broadcast_event(event)
elif isinstance(etype, ChainEventType):
return await self.__trigger_chain_event_async(event)
else:
logger.error(f"Unknown event type: {etype}")
return None
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
priority: Optional[int] = DEFAULT_EVENT_PRIORITY):
"""
注册事件处理器,将处理器添加到对应的事件订阅列表中
:param event_type: 事件类型 (EventType 或 ChainEventType)
:param handler: 处理器
:param priority: 可选,链式事件的优先级,默认为 10广播事件不需要优先级
"""
with self.__lock:
handler_identifier = self.__get_handler_identifier(handler)
if isinstance(event_type, ChainEventType):
# 链式事件,按优先级排序
if event_type not in self.__chain_subscribers:
self.__chain_subscribers[event_type] = {}
handlers = self.__chain_subscribers[event_type]
if handler_identifier in handlers:
handlers.pop(handler_identifier)
else:
logger.debug(
f"Subscribed to chain event: {event_type.value}, "
f"Priority: {priority} - {handler_identifier}")
handlers[handler_identifier] = (priority, handler)
# 根据优先级排序
self.__chain_subscribers[event_type] = dict(
sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0])
)
else:
# 广播事件
if event_type not in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type] = {}
handlers = self.__broadcast_subscribers[event_type]
if handler_identifier in handlers:
handlers.pop(handler_identifier)
else:
logger.debug(f"Subscribed to broadcast event: {event_type.value} - {handler_identifier}")
handlers[handler_identifier] = handler
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
"""
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 ChainEventType)
:param handler: 要移除的处理器
"""
with self.__lock:
handler_identifier = self.__get_handler_identifier(handler)
if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers:
self.__chain_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from chain event: {event_type.value} - {handler_identifier}")
elif event_type in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type].pop(handler_identifier, None)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value} - {handler_identifier}")
def disable_event_handler(self, target: Union[Callable, type]):
"""
禁用指定的事件处理器或事件处理器类
:param target: 处理器函数或类
"""
identifier = self.__get_handler_identifier(target)
if identifier in self.__disabled_handlers or identifier in self.__disabled_classes:
return
if isinstance(target, type):
self.__disabled_classes.add(identifier)
logger.debug(f"Disabled event handler class - {identifier}")
else:
self.__disabled_handlers.add(identifier)
logger.debug(f"Disabled event handler - {identifier}")
def enable_event_handler(self, target: Union[Callable, type]):
"""
启用指定的事件处理器或事件处理器类
:param target: 处理器函数或类
"""
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.discard(identifier)
logger.debug(f"Enabled event handler class - {identifier}")
else:
self.__disabled_handlers.discard(identifier)
logger.debug(f"Enabled event handler - {identifier}")
def visualize_handlers(self) -> List[Dict]:
"""
可视化所有事件处理器,包括是否被禁用的状态
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
"""
def parse_handler_data(data):
"""
解析处理器数据,判断是否包含优先级
:param data: 订阅者数据,可能是元组或单一值
:return: (priority, handler),若没有优先级则返回 (None, handler)
"""
if isinstance(data, tuple) and len(data) == 2:
return data
return None, data
handler_info = []
# 统一处理广播事件和链式事件
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
for handler_identifier, handler_data in subscribers.items():
# 解析优先级和处理器
priority, handler = parse_handler_data(handler_data)
# 检查处理器的启用状态
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
# 构建处理器信息字典
handler_dict = {
"event_type": event_type.value,
"handler_identifier": handler_identifier,
"status": status
}
if priority is not None:
handler_dict["priority"] = priority
handler_info.append(handler_dict)
return handler_info
@classmethod
def __get_handler_identifier(cls, target: Union[Callable, type]) -> Optional[str]:
"""
获取处理器或处理器类的唯一标识符,包括模块名和类名/方法名
:param target: 处理器函数或类
:return: 唯一标识符
"""
# 统一使用 inspect.getmodule 来获取模块名
module = inspect.getmodule(target)
module_name = module.__name__ if module else "unknown_module"
# 使用 __qualname__ 获取目标的限定名
qualname = target.__qualname__
return f"{module_name}.{qualname}"
@classmethod
def __get_class_from_callable(cls, handler: Callable) -> Optional[str]:
"""
获取可调用对象所属类的唯一标识符
:param handler: 可调用对象(函数、方法等)
:return: 类的唯一标识符
"""
# 对于绑定方法,通过 __self__.__class__ 获取类
if inspect.ismethod(handler) and hasattr(handler, "__self__"):
return cls.__get_handler_identifier(handler.__self__.__class__)
# 对于类实例(实现了 __call__ 方法)
if not inspect.isfunction(handler) and hasattr(handler, "__call__"):
handler_cls = handler.__class__ # noqa
return cls.__get_handler_identifier(handler_cls)
# 对于未绑定方法、静态方法、类方法,使用 __qualname__ 提取类信息
qualname_parts = handler.__qualname__.split(".")
if len(qualname_parts) > 1:
class_name = ".".join(qualname_parts[:-1])
module = inspect.getmodule(handler)
module_name = module.__name__ if module else "unknown_module"
return f"{module_name}.{class_name}"
return None
def __is_handler_enabled(self, handler: Callable) -> bool:
"""
检查处理器是否已启用(没有被禁用)
:param handler: 处理器函数
:return: 如果处理器启用则返回 True否则返回 False
"""
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 获取处理器所属类的唯一标识符
class_id = self.__get_class_from_callable(handler)
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
return False
return True
def __trigger_chain_event(self, event: Event) -> Optional[Event]:
"""
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
"""
logger.debug(f"Triggering synchronous chain event: {event}")
dispatch = self.__dispatch_chain_event(event)
return event if dispatch else None
async def __trigger_chain_event_async(self, event: Event) -> Optional[Event]:
"""
异步触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
"""
logger.debug(f"Triggering asynchronous chain event: {event}")
dispatch = await self.__dispatch_chain_event_async(event)
return event if dispatch else None
def __trigger_broadcast_event(self, event: Event):
"""
触发广播事件,将事件插入到优先级队列中
:param event: 要处理的事件对象
"""
logger.debug(f"Triggering broadcast event: {event}")
self.__event_queue.put((event.priority, event))
def __dispatch_chain_event(self, event: Event) -> bool:
"""
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, {})
if not handlers:
logger.debug(f"No handlers found for chain event: {event}")
return False
# 过滤出启用的处理器
enabled_handlers = {handler_id: (priority, handler) for handler_id, (priority, handler) in handlers.items()
if self.__is_handler_enabled(handler)}
if not enabled_handlers:
logger.debug(f"No enabled handlers found for chain event: {event}. Skipping execution.")
return False
self.__log_event_lifecycle(event, "Started")
for handler_id, (priority, handler) in enabled_handlers.items():
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
f"{self.__get_handler_identifier(handler)} (Priority: {priority}), "
f"completed in {time.time() - start_time:.3f}s for event: {event}"
)
self.__log_event_lifecycle(event, "Completed")
return True
async def __dispatch_chain_event_async(self, event: Event) -> bool:
"""
异步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, {})
if not handlers:
logger.debug(f"No handlers found for chain event: {event}")
return False
# 过滤出启用的处理器
enabled_handlers = {handler_id: (priority, handler) for handler_id, (priority, handler) in handlers.items()
if self.__is_handler_enabled(handler)}
if not enabled_handlers:
logger.debug(f"No enabled handlers found for chain event: {event}. Skipping execution.")
return False
self.__log_event_lifecycle(event, "Started")
for handler_id, (priority, handler) in enabled_handlers.items():
start_time = time.time()
await self.__safe_invoke_handler_async(handler, event)
logger.debug(
f"{self.__get_handler_identifier(handler)} (Priority: {priority}), "
f"completed in {time.time() - start_time:.3f}s for event: {event}"
)
self.__log_event_lifecycle(event, "Completed")
return True
def __dispatch_broadcast_event(self, event: Event):
"""
异步方式调度广播事件,通过线程池逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__broadcast_subscribers.get(event.event_type, {})
if not handlers:
logger.debug(f"No handlers found for broadcast event: {event}")
return
# 为每个处理器提供独立的事件实例,防止某个处理器对 event_data 的修改影响其他处理器
for handler_id, handler in handlers.items():
# 仅浅拷贝顶层字典,避免不必要的深拷贝开销;这样可以隔离键级别的替换/赋值
if isinstance(event.event_data, dict):
event_data_copy = event.event_data.copy()
else:
event_data_copy = event.event_data
isolated_event = Event(event_type=event.event_type,
event_data=event_data_copy,
priority=event.priority)
if inspect.iscoroutinefunction(handler):
# 对于异步函数,直接在事件循环中运行
asyncio.run_coroutine_threadsafe(
self.__safe_invoke_handler_async(handler, isolated_event),
global_vars.loop
)
else:
# 对于同步函数,在线程池中运行
self.__executor.submit(self.__safe_invoke_handler, handler, isolated_event)
def __safe_invoke_handler(self, handler: Callable, event: Event):
"""
调用处理器,处理链式或广播事件
:param handler: 处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
self.__invoke_handler_by_type_sync(handler, event)
async def __safe_invoke_handler_async(self, handler: Callable, event: Event):
"""
异步调用处理器,处理链式事件
:param handler: 处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
return
await self.__invoke_handler_by_type_async(handler, event)
def __invoke_handler_by_type_sync(self, handler: Callable, event: Event):
"""
同步方式根据处理器类型调用相应的方法
:param handler: 处理器
:param event: 要处理的事件对象
"""
class_name, method_name = self.__parse_handler_names(handler)
from app.core.plugin import PluginManager
from app.core.module import ModuleManager
plugin_manager = PluginManager()
module_manager = ModuleManager()
if class_name in plugin_manager.get_plugin_ids():
# 插件处理器
plugin = plugin_manager.running_plugins.get(class_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=plugin.name,
class_name=class_name, method_name=method_name, e=e)
elif class_name in module_manager.get_module_ids():
# 模块处理器
module = module_manager.get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
else:
# 全局处理器
class_obj = self.__get_class_instance(class_name)
if not class_obj or not hasattr(class_obj, method_name):
return
method = getattr(class_obj, method_name, None)
if not method:
return
try:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
async def __invoke_handler_by_type_async(self, handler: Callable, event: Event):
"""
异步方式根据处理器类型调用相应的方法
:param handler: 处理器
:param event: 要处理的事件对象
"""
class_name, method_name = self.__parse_handler_names(handler)
from app.core.plugin import PluginManager
from app.core.module import ModuleManager
plugin_manager = PluginManager()
module_manager = ModuleManager()
if class_name in plugin_manager.get_plugin_ids():
await self.__invoke_plugin_method_async(plugin_manager, class_name, method_name, event)
elif class_name in module_manager.get_module_ids():
await self.__invoke_module_method_async(module_manager, class_name, method_name, event)
else:
await self.__invoke_global_method_async(class_name, method_name, event)
@staticmethod
def __parse_handler_names(handler: Callable) -> Tuple[str, str]:
"""
解析处理器的类名和方法名
:param handler: 处理器
:return: (class_name, method_name)
"""
names = handler.__qualname__.split(".")
return names[0], names[1]
async def __invoke_plugin_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用插件方法
"""
plugin = handler.running_plugins.get(class_name)
if not plugin:
return
method = getattr(plugin, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
# 插件同步函数在异步环境中运行,避免阻塞
await run_in_threadpool(method, event)
except Exception as e:
self.__handle_event_error(event=event, module_name=plugin.name,
class_name=class_name, method_name=method_name, e=e)
async def __invoke_module_method_async(self, handler: Any, class_name: str, method_name: str, event: Event):
"""
异步调用模块方法
"""
module = handler.get_running_module(class_name)
if not module:
return
method = getattr(module, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=module.get_name(),
class_name=class_name, method_name=method_name, e=e)
async def __invoke_global_method_async(self, class_name: str, method_name: str, event: Event):
"""
异步调用全局对象方法
"""
class_obj = self.__get_class_instance(class_name)
if not class_obj:
return
method = getattr(class_obj, method_name, None)
if not method:
return
try:
if inspect.iscoroutinefunction(method):
await method(event)
else:
method(event)
except Exception as e:
self.__handle_event_error(event=event, module_name=class_name,
class_name=class_name, method_name=method_name, e=e)
@staticmethod
def __get_class_instance(class_name: str):
"""
根据类名获取类实例,首先检查全局变量中是否存在该类,如果不存在则尝试动态导入模块。
:param class_name: 类的名称
:return: 类的实例
"""
# 检查类是否在全局变量中
if class_name in globals():
try:
class_obj = globals()[class_name]()
return class_obj
except Exception as e:
logger.error(f"事件处理出错:创建全局类实例出错:{str(e)} - {traceback.format_exc()}")
return None
# 如果类不在全局变量中,尝试动态导入模块并创建实例
try:
if class_name.endswith("Manager"):
module_name = f"app.core.{class_name[:-7].lower()}"
module = importlib.import_module(module_name)
elif class_name.endswith("Chain"):
module_name = f"app.chain.{class_name[:-5].lower()}"
module = importlib.import_module(module_name)
elif class_name.endswith("Helper"):
# 特殊处理 Async 类
if class_name.startswith("Async"):
module_name = f"app.helper.{class_name[5:-6].lower()}"
else:
module_name = f"app.helper.{class_name[:-6].lower()}"
module = importlib.import_module(module_name)
else:
module_name = f"app.{class_name.lower()}"
module = importlib.import_module(module_name)
if hasattr(module, class_name):
class_obj = getattr(module, class_name)()
return class_obj
else:
logger.debug(f"事件处理出错:模块 {module_name} 中没有找到类 {class_name}")
except Exception as e:
logger.debug(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
return None
def __broadcast_consumer_loop(self):
"""
持续从队列中提取事件的后台广播消费者线程
"""
jitter_factor = 0.1
rate_limiter = ExponentialBackoffRateLimiter(base_wait=INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
max_wait=MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
backoff_factor=2.0,
source="BroadcastConsumer",
enable_logging=False)
while self.__event.is_set():
try:
priority, event = self.__event_queue.get(timeout=rate_limiter.current_wait)
rate_limiter.reset()
self.__dispatch_broadcast_event(event)
except Empty:
rate_limiter.current_wait = rate_limiter.current_wait * random.uniform(1, 1 + jitter_factor)
rate_limiter.trigger_limit()
@staticmethod
def __log_event_lifecycle(event: Event, stage: str):
"""
记录事件的生命周期日志
"""
logger.debug(f"{stage} - {event}")
def __handle_event_error(self, event: Event, module_name: str,
class_name: str, method_name: str, e: Exception):
"""
全局错误处理器,用于处理事件处理中的异常
"""
logger.error(f"{module_name} 事件处理出错:{str(e)} - {traceback.format_exc()}")
# 发送系统错误通知
from app.helper.message import MessageHelper
MessageHelper().put(title=f"{module_name} 处理事件 {event.event_type} 时出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.send_event(
EventType.SystemError,
{
"type": "event",
"event_type": event.event_type,
"event_handle": f"{class_name}.{method_name}",
"error": str(e),
"traceback": traceback.format_exc()
}
)
def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type],
priority: Optional[int] = DEFAULT_EVENT_PRIORITY):
"""
事件注册装饰器,用于将函数注册为事件的处理器
:param etype:
- 单个事件类型成员 (如 EventType.MetadataScrape, ChainEventType.PluginAction)
- 事件类型类 (EventType, ChainEventType)
- 或事件类型成员的列表
:param priority: 可选,链式事件的优先级,默认为 DEFAULT_EVENT_PRIORITY
"""
def decorator(f: Callable):
# 将输入的事件类型统一转换为列表格式
if isinstance(etype, list):
# 传入的已经是列表,直接使用
event_list = etype
else:
# 不是列表则包裹成单一元素的列表
event_list = [etype]
# 遍历列表,处理每个事件类型
for event in event_list:
if isinstance(event, (EventType, ChainEventType)):
self.add_event_listener(event, f, priority)
elif isinstance(event, type) and issubclass(event, (EventType, ChainEventType)):
# 如果是 EventType 或 ChainEventType 类,提取该类中的所有成员
for et in event.__members__.values():
self.add_event_listener(et, f, priority)
else:
raise ValueError(f"无效的事件类型: {event}")
return f
return decorator
# 全局实例定义
eventmanager = EventManager()