From 8b75d2312c9a6a52871363d246d9dfd91fc621fc Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 08:56:32 +0800 Subject: [PATCH] add async run_module --- app/chain/__init__.py | 228 +++++++++++++++++++++++------- app/modules/themoviedb/tmdbapi.py | 8 +- 2 files changed, 177 insertions(+), 59 deletions(-) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 88558d95..c04292f3 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -1,4 +1,5 @@ import copy +import inspect import pickle import traceback from abc import ABCMeta @@ -78,23 +79,66 @@ class ChainBase(metaclass=ABCMeta): if cache_path.exists(): cache_path.unlink() - def run_module(self, method: str, *args, **kwargs) -> Any: + @staticmethod + def __is_valid_empty(ret): """ - 运行包含该方法的所有模块,然后返回结果 - 当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常 + 判断结果是否为空 """ + if isinstance(ret, tuple): + return all(value is None for value in ret) + else: + return ret is None - def is_result_empty(ret): - """ - 判断结果是否为空 - """ - if isinstance(ret, tuple): - return all(value is None for value in ret) - else: - return ret is None + def __handle_plugin_error(self, err: Exception, plugin_id: str, plugin_name: str, method: str, **kwargs): + """ + 处理插件模块执行错误 + """ + if kwargs.get("raise_exception"): + raise + logger.error( + f"运行插件 {plugin_id} 模块 {method} 出错:{str(err)}\n{traceback.format_exc()}") + self.messagehelper.put(title=f"{plugin_name} 发生了错误", + message=str(err), + role="plugin") + self.eventmanager.send_event( + EventType.SystemError, + { + "type": "plugin", + "plugin_id": plugin_id, + "plugin_name": plugin_name, + "plugin_method": method, + "error": str(err), + "traceback": traceback.format_exc() + } + ) - result = None - # 插件模块 + def __handle_system_error(self, err: Exception, module_id: str, module_name: str, method: str, **kwargs): + """ + 处理系统模块执行错误 + """ + if kwargs.get("raise_exception"): + raise + logger.error( + f"运行模块 {module_id}.{method} 出错:{str(err)}\n{traceback.format_exc()}") + self.messagehelper.put(title=f"{module_name}发生了错误", + message=str(err), + role="system") + self.eventmanager.send_event( + EventType.SystemError, + { + "type": "module", + "module_id": module_id, + "module_name": module_name, + "module_method": method, + "error": str(err), + "traceback": traceback.format_exc() + } + ) + + def __execute_plugin_modules(self, method: str, result: Any, *args, **kwargs) -> Any: + """ + 执行插件模块 + """ for plugin, module_dict in self.pluginmanager.get_plugin_modules().items(): plugin_id, plugin_name = plugin if method in module_dict: @@ -102,7 +146,7 @@ class ChainBase(metaclass=ABCMeta): if func: try: logger.info(f"请求插件 {plugin_name} 执行:{method} ...") - if is_result_empty(result): + if self.__is_valid_empty(result): # 返回None,第一次执行或者需继续执行下一模块 result = func(*args, **kwargs) elif isinstance(result, list): @@ -113,29 +157,44 @@ class ChainBase(metaclass=ABCMeta): else: break except Exception as err: - if kwargs.get("raise_exception"): - raise - logger.error( - f"运行插件 {plugin_id} 模块 {method} 出错:{str(err)}\n{traceback.format_exc()}") - self.messagehelper.put(title=f"{plugin_name} 发生了错误", - message=str(err), - role="plugin") - self.eventmanager.send_event( - EventType.SystemError, - { - "type": "plugin", - "plugin_id": plugin_id, - "plugin_name": plugin_name, - "plugin_method": method, - "error": str(err), - "traceback": traceback.format_exc() - } - ) - if not is_result_empty(result) and not isinstance(result, list): - # 插件模块返回结果不为空且不是列表,直接返回 - return result + self.__handle_plugin_error(err, plugin_id, plugin_name, method, **kwargs) + return result - # 系统模块 + async def __async_execute_plugin_modules(self, method: str, result: Any, *args, **kwargs) -> Any: + """ + 异步执行插件模块 + """ + for plugin, module_dict in self.pluginmanager.get_plugin_modules().items(): + plugin_id, plugin_name = plugin + if method in module_dict: + func = module_dict[method] + if func: + try: + logger.info(f"请求插件 {plugin_name} 执行:{method} ...") + if self.__is_valid_empty(result): + # 返回None,第一次执行或者需继续执行下一模块 + if inspect.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = func(*args, **kwargs) + elif isinstance(result, list): + # 返回为列表,有多个模块运行结果时进行合并 + if inspect.iscoroutinefunction(func): + temp = await func(*args, **kwargs) + else: + temp = func(*args, **kwargs) + if isinstance(temp, list): + result.extend(temp) + else: + break + except Exception as err: + self.__handle_plugin_error(err, plugin_id, plugin_name, method, **kwargs) + return result + + def __execute_system_modules(self, method: str, result: Any, *args, **kwargs) -> Any: + """ + 执行系统模块 + """ logger.debug(f"请求系统模块执行:{method} ...") for module in sorted(self.modulemanager.get_running_modules(method), key=lambda x: x.get_priority()): module_id = module.__class__.__name__ @@ -146,7 +205,7 @@ class ChainBase(metaclass=ABCMeta): module_name = module_id try: func = getattr(module, method) - if is_result_empty(result): + if self.__is_valid_empty(result): # 返回None,第一次执行或者需继续执行下一模块 result = func(*args, **kwargs) elif ObjectUtils.check_signature(func, result): @@ -161,26 +220,85 @@ class ChainBase(metaclass=ABCMeta): # 中止继续执行 break except Exception as err: - if kwargs.get("raise_exception"): - raise - logger.error( - f"运行模块 {module_id}.{method} 出错:{str(err)}\n{traceback.format_exc()}") - self.messagehelper.put(title=f"{module_name}发生了错误", - message=str(err), - role="system") - self.eventmanager.send_event( - EventType.SystemError, - { - "type": "module", - "module_id": module_id, - "module_name": module_name, - "module_method": method, - "error": str(err), - "traceback": traceback.format_exc() - } - ) + self.__handle_system_error(err, module_id, module_name, method, **kwargs) return result + async def __async_execute_system_modules(self, method: str, result: Any, *args, **kwargs) -> Any: + """ + 异步执行系统模块 + """ + logger.debug(f"请求系统模块执行:{method} ...") + for module in sorted(self.modulemanager.get_running_modules(method), key=lambda x: x.get_priority()): + module_id = module.__class__.__name__ + try: + module_name = module.get_name() + except Exception as err: + logger.debug(f"获取模块名称出错:{str(err)}") + module_name = module_id + try: + func = getattr(module, method) + if self.__is_valid_empty(result): + # 返回None,第一次执行或者需继续执行下一模块 + if inspect.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = func(*args, **kwargs) + elif ObjectUtils.check_signature(func, result): + # 返回结果与方法签名一致,将结果传入 + if inspect.iscoroutinefunction(func): + result = await func(result) + else: + result = func(result) + elif isinstance(result, list): + # 返回为列表,有多个模块运行结果时进行合并 + if inspect.iscoroutinefunction(func): + temp = await func(*args, **kwargs) + else: + temp = func(*args, **kwargs) + if isinstance(temp, list): + result.extend(temp) + else: + # 中止继续执行 + break + except Exception as err: + self.__handle_system_error(err, module_id, module_name, method, **kwargs) + return result + + def run_module(self, method: str, *args, **kwargs) -> Any: + """ + 运行包含该方法的所有模块,然后返回结果 + 当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常 + """ + result = None + + # 执行插件模块 + result = self.__execute_plugin_modules(method, result, *args, **kwargs) + + if not self.__is_valid_empty(result) and not isinstance(result, list): + # 插件模块返回结果不为空且不是列表,直接返回 + return result + + # 执行系统模块 + return self.__execute_system_modules(method, result, *args, **kwargs) + + async def async_run_module(self, method: str, *args, **kwargs) -> Any: + """ + 异步运行包含该方法的所有模块,然后返回结果 + 当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常 + 支持异步和同步方法的混合调用 + """ + result = None + + # 执行插件模块 + result = await self.__async_execute_plugin_modules(method, result, *args, **kwargs) + + if not self.__is_valid_empty(result) and not isinstance(result, list): + # 插件模块返回结果不为空且不是列表,直接返回 + return result + + # 执行系统模块 + return await self.__async_execute_system_modules(method, result, *args, **kwargs) + def recognize_media(self, meta: MetaBase = None, mtype: Optional[MediaType] = None, tmdbid: Optional[int] = None, diff --git a/app/modules/themoviedb/tmdbapi.py b/app/modules/themoviedb/tmdbapi.py index 4f712d51..2c4c1da1 100644 --- a/app/modules/themoviedb/tmdbapi.py +++ b/app/modules/themoviedb/tmdbapi.py @@ -2066,8 +2066,8 @@ class TmdbApi: logger.error(str(e)) return [] - async def async_get_movie_credits(self, tmdbid: int, page: Optional[int] = 1, count: Optional[int] = 24) -> List[ - dict]: + async def async_get_movie_credits(self, tmdbid: int, + page: Optional[int] = 1, count: Optional[int] = 24) -> List[dict]: """ 获取电影的演职员列表(异步版本) """ @@ -2150,8 +2150,8 @@ class TmdbApi: logger.error(str(e)) return {} - async def async_get_person_credits(self, person_id: int, page: Optional[int] = 1, count: Optional[int] = 24) -> \ - List[dict]: + async def async_get_person_credits(self, person_id: int, + page: Optional[int] = 1, count: Optional[int] = 24) -> List[dict]: """ 获取人物参演作品(异步版本) """