From aea44c1d97d284e91487a5c7a923e8b62b0e844d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 17:27:15 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E9=94=AE=E5=BC=8F=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=8D=8F=E7=A8=8B=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/media.py | 7 +- app/chain/__init__.py | 2 +- app/chain/media.py | 2 +- app/core/event.py | 261 ++++++++++++++++++++++++++++++++----- app/core/plugin.py | 20 +++ 5 files changed, 251 insertions(+), 41 deletions(-) diff --git a/app/api/endpoints/media.py b/app/api/endpoints/media.py index 4a1c75bc..9ab925f0 100644 --- a/app/api/endpoints/media.py +++ b/app/api/endpoints/media.py @@ -86,14 +86,15 @@ async def search(title: str, return obj.source result = [] + media_chain = MediaChain() if type == "media": - _, medias = await MediaChain().async_search(title=title) + _, medias = await media_chain.async_search(title=title) if medias: result = [media.to_dict() for media in medias] elif type == "collection": - result = await MediaChain().async_search_collections(name=title) + result = await media_chain.async_search_collections(name=title) else: - result = await MediaChain().async_search_persons(name=title) + result = await media_chain.async_search_persons(name=title) if result: # 按设置的顺序对结果进行排序 setting_order = settings.SEARCH_SOURCE.split(',') or [] diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 6d4fe5a7..bdae04d7 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -473,7 +473,7 @@ class ChainBase(metaclass=ABCMeta): :param raise_exception: 触发速率限制时是否抛出异常 """ return await self.async_run_module("async_douban_info", doubanid=doubanid, mtype=mtype, - raise_exception=raise_exception) + raise_exception=raise_exception) def tvdb_info(self, tvdbid: int) -> Optional[dict]: """ diff --git a/app/chain/media.py b/app/chain/media.py index 24a67b45..2abd7017 100644 --- a/app/chain/media.py +++ b/app/chain/media.py @@ -794,7 +794,7 @@ class MediaChain(ChainBase): :param org_meta: 原始元数据 """ # 发送请求事件,等待结果 - result: Event = eventmanager.send_event( + result: Event = await eventmanager.async_send_event( ChainEventType.NameRecognize, { 'title': title, diff --git a/app/core/event.py b/app/core/event.py index 6eb35aa7..db7ce6ef 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -7,7 +7,7 @@ import time import traceback import uuid from queue import Empty, PriorityQueue -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, Union from app.helper.thread import ThreadHelper from app.log import logger @@ -142,6 +142,25 @@ class EventManager(metaclass=Singleton): logger.error(f"Unknown event type: {etype}") return None + async def async_send_event(self, etype: Union[EventType, ChainEventType], + data: Optional[Union[Dict, ChainEventData]] = None, + priority: Optional[int] = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: + """ + 异步发送事件,根据事件类型决定是广播事件还是链式事件 + :param etype: 事件类型 (EventType 或 ChainEventType) + :param data: 可选,事件数据 + :param priority: 广播事件的优先级,默认为 10 + :return: 如果是链式事件,返回处理后的事件数据;否则返回 None + """ + event = Event(etype, data, priority) + if isinstance(etype, EventType): + return self.__trigger_broadcast_event(event) + elif isinstance(etype, ChainEventType): + return await self.__trigger_chain_event_async(event) + else: + logger.error(f"Unknown event type: {etype}") + return None + def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable, priority: Optional[int] = DEFAULT_EVENT_PRIORITY): """ @@ -325,6 +344,14 @@ class EventManager(metaclass=Singleton): dispatch = self.__dispatch_chain_event(event) return event if dispatch else None + async def __trigger_chain_event_async(self, event: Event) -> Optional[Event]: + """ + 异步触发链式事件,按顺序调用订阅的处理器,并记录处理耗时 + """ + logger.debug(f"Triggering asynchronous chain event: {event}") + dispatch = await self.__dispatch_chain_event_async(event) + return event if dispatch else None + def __trigger_broadcast_event(self, event: Event): """ 触发广播事件,将事件插入到优先级队列中 @@ -362,6 +389,35 @@ class EventManager(metaclass=Singleton): self.__log_event_lifecycle(event, "Completed") return True + async def __dispatch_chain_event_async(self, event: Event) -> bool: + """ + 异步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间 + :param event: 要调度的事件对象 + """ + handlers = self.__chain_subscribers.get(event.event_type, {}) + if not handlers: + logger.debug(f"No handlers found for chain event: {event}") + return False + + # 过滤出启用的处理器 + enabled_handlers = {handler_id: (priority, handler) for handler_id, (priority, handler) in handlers.items() + if self.__is_handler_enabled(handler)} + + if not enabled_handlers: + logger.debug(f"No enabled handlers found for chain event: {event}. Skipping execution.") + return False + + self.__log_event_lifecycle(event, "Started") + for handler_id, (priority, handler) in enabled_handlers.items(): + start_time = time.time() + await self.__safe_invoke_handler_async(handler, event) + logger.debug( + f"{self.__get_handler_identifier(handler)} (Priority: {priority}), " + f"completed in {time.time() - start_time:.3f}s for event: {event}" + ) + self.__log_event_lifecycle(event, "Completed") + return True + def __dispatch_broadcast_event(self, event: Event): """ 异步方式调度广播事件,通过线程池逐个调用事件处理器 @@ -388,45 +444,178 @@ class EventManager(metaclass=Singleton): is_broadcast_event = isinstance(event.event_type, EventType) event_to_process = copy.deepcopy(event) if is_broadcast_event else event - names = handler.__qualname__.split(".") - class_name, method_name = names[0], names[1] - try: - from app.core.plugin import PluginManager - from app.core.module import ModuleManager - - if class_name in PluginManager().get_plugin_ids(): - def plugin_callable(): - """ - 插件调用函数 - """ - PluginManager().run_plugin_method(class_name, method_name, event_to_process) - - if is_broadcast_event: - self.__executor.submit(plugin_callable) - else: - plugin_callable() - elif class_name in ModuleManager().get_module_ids(): - module = ModuleManager().get_running_module(class_name) - if module: - method = getattr(module, method_name, None) - if method: - if is_broadcast_event: - self.__executor.submit(method, event_to_process) - else: - method(event_to_process) - else: - # 获取全局对象或模块类的实例 - class_obj = self.__get_class_instance(class_name) - if class_obj and hasattr(class_obj, method_name): - method = getattr(class_obj, method_name) - if is_broadcast_event: - self.__executor.submit(method, event_to_process) - else: - method(event_to_process) + self.__invoke_handler_by_type_sync(handler, event_to_process, is_broadcast_event) except Exception as e: self.__handle_event_error(event, handler, e) + async def __safe_invoke_handler_async(self, handler: Callable, event: Event): + """ + 异步调用处理器,处理链式事件 + :param handler: 处理器 + :param event: 事件对象 + """ + if not self.__is_handler_enabled(handler): + logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution") + return + + # 链式事件不需要深复制 + event_to_process = event + + try: + await self.__invoke_handler_by_type_async(handler, event_to_process) + except Exception as e: + self.__handle_event_error(event, handler, e) + + def __invoke_handler_by_type_sync(self, handler: Callable, event_to_process: Event, is_broadcast_event: bool): + """ + 同步方式根据处理器类型调用相应的方法 + :param handler: 处理器 + :param event_to_process: 要处理的事件对象 + :param is_broadcast_event: 是否为广播事件 + """ + class_name, method_name = self.__parse_handler_names(handler) + + from app.core.plugin import PluginManager + from app.core.module import ModuleManager + + if class_name in PluginManager().get_plugin_ids(): + self.__invoke_plugin_method_sync(class_name, method_name, event_to_process, is_broadcast_event) + elif class_name in ModuleManager().get_module_ids(): + self.__invoke_module_method_sync(class_name, method_name, event_to_process, is_broadcast_event) + else: + self.__invoke_global_method_sync(class_name, method_name, event_to_process, is_broadcast_event) + + async def __invoke_handler_by_type_async(self, handler: Callable, event_to_process: Event): + """ + 异步方式根据处理器类型调用相应的方法 + :param handler: 处理器 + :param event_to_process: 要处理的事件对象 + """ + class_name, method_name = self.__parse_handler_names(handler) + + from app.core.plugin import PluginManager + from app.core.module import ModuleManager + + if class_name in PluginManager().get_plugin_ids(): + await self.__invoke_plugin_method_async(class_name, method_name, event_to_process) + elif class_name in ModuleManager().get_module_ids(): + await self.__invoke_module_method_async(class_name, method_name, event_to_process) + else: + await self.__invoke_global_method_async(class_name, method_name, event_to_process) + + @staticmethod + def __parse_handler_names(handler: Callable) -> Tuple[str, str]: + """ + 解析处理器的类名和方法名 + :param handler: 处理器 + :return: (class_name, method_name) + """ + names = handler.__qualname__.split(".") + return names[0], names[1] + + def __invoke_plugin_method_sync(self, class_name: str, method_name: str, event_to_process: Event, + is_broadcast_event: bool): + """ + 同步调用插件方法 + """ + from app.core.plugin import PluginManager + + def plugin_callable(): + PluginManager().run_plugin_method(class_name, method_name, event_to_process) + + if is_broadcast_event: + self.__executor.submit(plugin_callable) + else: + plugin_callable() + + @staticmethod + async def __invoke_plugin_method_async(class_name: str, method_name: str, event_to_process: Event): + """ + 异步调用插件方法 + """ + from app.core.plugin import PluginManager + + plugin_manager = PluginManager() + plugin = plugin_manager.running_plugins.get(class_name) + if plugin and hasattr(plugin, method_name): + method = getattr(plugin, method_name) + if inspect.iscoroutinefunction(method): + await method(event_to_process) + else: + plugin_manager.run_plugin_method(class_name, method_name, event_to_process) + + def __invoke_module_method_sync(self, class_name: str, method_name: str, event_to_process: Event, + is_broadcast_event: bool): + """ + 同步调用模块方法 + """ + from app.core.module import ModuleManager + + module = ModuleManager().get_running_module(class_name) + if not module: + return + + method = getattr(module, method_name, None) + if not method: + return + + if is_broadcast_event: + self.__executor.submit(method, event_to_process) + else: + method(event_to_process) + + @staticmethod + async def __invoke_module_method_async(class_name: str, method_name: str, event_to_process: Event): + """ + 异步调用模块方法 + """ + from app.core.module import ModuleManager + + module = ModuleManager().get_running_module(class_name) + if not module: + return + + method = getattr(module, method_name, None) + if not method: + return + + if inspect.iscoroutinefunction(method): + await method(event_to_process) + else: + method(event_to_process) + + def __invoke_global_method_sync(self, class_name: str, method_name: str, event_to_process: Event, + is_broadcast_event: bool): + """ + 同步调用全局对象方法 + """ + class_obj = self.__get_class_instance(class_name) + if not class_obj or not hasattr(class_obj, method_name): + return + + method = getattr(class_obj, method_name) + + if is_broadcast_event: + self.__executor.submit(method, event_to_process) + else: + method(event_to_process) + + async def __invoke_global_method_async(self, class_name: str, method_name: str, event_to_process: Event): + """ + 异步调用全局对象方法 + """ + class_obj = self.__get_class_instance(class_name) + if not class_obj or not hasattr(class_obj, method_name): + return + + method = getattr(class_obj, method_name) + + if inspect.iscoroutinefunction(method): + await method(event_to_process) + else: + method(event_to_process) + @staticmethod def __get_class_instance(class_name: str): """ diff --git a/app/core/plugin.py b/app/core/plugin.py index d824e600..1cc5180e 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -1,3 +1,4 @@ +import asyncio import concurrent import concurrent.futures import importlib.util @@ -831,6 +832,25 @@ class PluginManager(metaclass=Singleton): return None return getattr(plugin, method)(*args, **kwargs) + async def async_run_plugin_method(self, pid: str, method: str, *args, **kwargs) -> Any: + """ + 异步运行插件方法 + :param pid: 插件ID + :param method: 方法名 + :param args: 参数 + :param kwargs: 关键字参数 + """ + plugin = self._running_plugins.get(pid) + if not plugin: + return None + if not hasattr(plugin, method): + return None + method_func = getattr(plugin, method) + if asyncio.iscoroutinefunction(method_func): + return await method_func(*args, **kwargs) + else: + return method_func(*args, **kwargs) + def get_plugin_ids(self) -> List[str]: """ 获取所有插件ID