mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-14 10:10:20 +08:00
add async run_module
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import copy
|
||||
import inspect
|
||||
import pickle
|
||||
import traceback
|
||||
from abc import ABCMeta
|
||||
@@ -78,23 +79,66 @@ class ChainBase(metaclass=ABCMeta):
|
||||
if cache_path.exists():
|
||||
cache_path.unlink()
|
||||
|
||||
def run_module(self, method: str, *args, **kwargs) -> Any:
|
||||
@staticmethod
|
||||
def __is_valid_empty(ret):
|
||||
"""
|
||||
运行包含该方法的所有模块,然后返回结果
|
||||
当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常
|
||||
判断结果是否为空
|
||||
"""
|
||||
if isinstance(ret, tuple):
|
||||
return all(value is None for value in ret)
|
||||
else:
|
||||
return ret is None
|
||||
|
||||
def is_result_empty(ret):
|
||||
"""
|
||||
判断结果是否为空
|
||||
"""
|
||||
if isinstance(ret, tuple):
|
||||
return all(value is None for value in ret)
|
||||
else:
|
||||
return ret is None
|
||||
def __handle_plugin_error(self, err: Exception, plugin_id: str, plugin_name: str, method: str, **kwargs):
|
||||
"""
|
||||
处理插件模块执行错误
|
||||
"""
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
logger.error(
|
||||
f"运行插件 {plugin_id} 模块 {method} 出错:{str(err)}\n{traceback.format_exc()}")
|
||||
self.messagehelper.put(title=f"{plugin_name} 发生了错误",
|
||||
message=str(err),
|
||||
role="plugin")
|
||||
self.eventmanager.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "plugin",
|
||||
"plugin_id": plugin_id,
|
||||
"plugin_name": plugin_name,
|
||||
"plugin_method": method,
|
||||
"error": str(err),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
|
||||
result = None
|
||||
# 插件模块
|
||||
def __handle_system_error(self, err: Exception, module_id: str, module_name: str, method: str, **kwargs):
|
||||
"""
|
||||
处理系统模块执行错误
|
||||
"""
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
logger.error(
|
||||
f"运行模块 {module_id}.{method} 出错:{str(err)}\n{traceback.format_exc()}")
|
||||
self.messagehelper.put(title=f"{module_name}发生了错误",
|
||||
message=str(err),
|
||||
role="system")
|
||||
self.eventmanager.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "module",
|
||||
"module_id": module_id,
|
||||
"module_name": module_name,
|
||||
"module_method": method,
|
||||
"error": str(err),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
|
||||
def __execute_plugin_modules(self, method: str, result: Any, *args, **kwargs) -> Any:
|
||||
"""
|
||||
执行插件模块
|
||||
"""
|
||||
for plugin, module_dict in self.pluginmanager.get_plugin_modules().items():
|
||||
plugin_id, plugin_name = plugin
|
||||
if method in module_dict:
|
||||
@@ -102,7 +146,7 @@ class ChainBase(metaclass=ABCMeta):
|
||||
if func:
|
||||
try:
|
||||
logger.info(f"请求插件 {plugin_name} 执行:{method} ...")
|
||||
if is_result_empty(result):
|
||||
if self.__is_valid_empty(result):
|
||||
# 返回None,第一次执行或者需继续执行下一模块
|
||||
result = func(*args, **kwargs)
|
||||
elif isinstance(result, list):
|
||||
@@ -113,29 +157,44 @@ class ChainBase(metaclass=ABCMeta):
|
||||
else:
|
||||
break
|
||||
except Exception as err:
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
logger.error(
|
||||
f"运行插件 {plugin_id} 模块 {method} 出错:{str(err)}\n{traceback.format_exc()}")
|
||||
self.messagehelper.put(title=f"{plugin_name} 发生了错误",
|
||||
message=str(err),
|
||||
role="plugin")
|
||||
self.eventmanager.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "plugin",
|
||||
"plugin_id": plugin_id,
|
||||
"plugin_name": plugin_name,
|
||||
"plugin_method": method,
|
||||
"error": str(err),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
if not is_result_empty(result) and not isinstance(result, list):
|
||||
# 插件模块返回结果不为空且不是列表,直接返回
|
||||
return result
|
||||
self.__handle_plugin_error(err, plugin_id, plugin_name, method, **kwargs)
|
||||
return result
|
||||
|
||||
# 系统模块
|
||||
async def __async_execute_plugin_modules(self, method: str, result: Any, *args, **kwargs) -> Any:
|
||||
"""
|
||||
异步执行插件模块
|
||||
"""
|
||||
for plugin, module_dict in self.pluginmanager.get_plugin_modules().items():
|
||||
plugin_id, plugin_name = plugin
|
||||
if method in module_dict:
|
||||
func = module_dict[method]
|
||||
if func:
|
||||
try:
|
||||
logger.info(f"请求插件 {plugin_name} 执行:{method} ...")
|
||||
if self.__is_valid_empty(result):
|
||||
# 返回None,第一次执行或者需继续执行下一模块
|
||||
if inspect.iscoroutinefunction(func):
|
||||
result = await func(*args, **kwargs)
|
||||
else:
|
||||
result = func(*args, **kwargs)
|
||||
elif isinstance(result, list):
|
||||
# 返回为列表,有多个模块运行结果时进行合并
|
||||
if inspect.iscoroutinefunction(func):
|
||||
temp = await func(*args, **kwargs)
|
||||
else:
|
||||
temp = func(*args, **kwargs)
|
||||
if isinstance(temp, list):
|
||||
result.extend(temp)
|
||||
else:
|
||||
break
|
||||
except Exception as err:
|
||||
self.__handle_plugin_error(err, plugin_id, plugin_name, method, **kwargs)
|
||||
return result
|
||||
|
||||
def __execute_system_modules(self, method: str, result: Any, *args, **kwargs) -> Any:
|
||||
"""
|
||||
执行系统模块
|
||||
"""
|
||||
logger.debug(f"请求系统模块执行:{method} ...")
|
||||
for module in sorted(self.modulemanager.get_running_modules(method), key=lambda x: x.get_priority()):
|
||||
module_id = module.__class__.__name__
|
||||
@@ -146,7 +205,7 @@ class ChainBase(metaclass=ABCMeta):
|
||||
module_name = module_id
|
||||
try:
|
||||
func = getattr(module, method)
|
||||
if is_result_empty(result):
|
||||
if self.__is_valid_empty(result):
|
||||
# 返回None,第一次执行或者需继续执行下一模块
|
||||
result = func(*args, **kwargs)
|
||||
elif ObjectUtils.check_signature(func, result):
|
||||
@@ -161,26 +220,85 @@ class ChainBase(metaclass=ABCMeta):
|
||||
# 中止继续执行
|
||||
break
|
||||
except Exception as err:
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
logger.error(
|
||||
f"运行模块 {module_id}.{method} 出错:{str(err)}\n{traceback.format_exc()}")
|
||||
self.messagehelper.put(title=f"{module_name}发生了错误",
|
||||
message=str(err),
|
||||
role="system")
|
||||
self.eventmanager.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "module",
|
||||
"module_id": module_id,
|
||||
"module_name": module_name,
|
||||
"module_method": method,
|
||||
"error": str(err),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
self.__handle_system_error(err, module_id, module_name, method, **kwargs)
|
||||
return result
|
||||
|
||||
async def __async_execute_system_modules(self, method: str, result: Any, *args, **kwargs) -> Any:
|
||||
"""
|
||||
异步执行系统模块
|
||||
"""
|
||||
logger.debug(f"请求系统模块执行:{method} ...")
|
||||
for module in sorted(self.modulemanager.get_running_modules(method), key=lambda x: x.get_priority()):
|
||||
module_id = module.__class__.__name__
|
||||
try:
|
||||
module_name = module.get_name()
|
||||
except Exception as err:
|
||||
logger.debug(f"获取模块名称出错:{str(err)}")
|
||||
module_name = module_id
|
||||
try:
|
||||
func = getattr(module, method)
|
||||
if self.__is_valid_empty(result):
|
||||
# 返回None,第一次执行或者需继续执行下一模块
|
||||
if inspect.iscoroutinefunction(func):
|
||||
result = await func(*args, **kwargs)
|
||||
else:
|
||||
result = func(*args, **kwargs)
|
||||
elif ObjectUtils.check_signature(func, result):
|
||||
# 返回结果与方法签名一致,将结果传入
|
||||
if inspect.iscoroutinefunction(func):
|
||||
result = await func(result)
|
||||
else:
|
||||
result = func(result)
|
||||
elif isinstance(result, list):
|
||||
# 返回为列表,有多个模块运行结果时进行合并
|
||||
if inspect.iscoroutinefunction(func):
|
||||
temp = await func(*args, **kwargs)
|
||||
else:
|
||||
temp = func(*args, **kwargs)
|
||||
if isinstance(temp, list):
|
||||
result.extend(temp)
|
||||
else:
|
||||
# 中止继续执行
|
||||
break
|
||||
except Exception as err:
|
||||
self.__handle_system_error(err, module_id, module_name, method, **kwargs)
|
||||
return result
|
||||
|
||||
def run_module(self, method: str, *args, **kwargs) -> Any:
|
||||
"""
|
||||
运行包含该方法的所有模块,然后返回结果
|
||||
当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常
|
||||
"""
|
||||
result = None
|
||||
|
||||
# 执行插件模块
|
||||
result = self.__execute_plugin_modules(method, result, *args, **kwargs)
|
||||
|
||||
if not self.__is_valid_empty(result) and not isinstance(result, list):
|
||||
# 插件模块返回结果不为空且不是列表,直接返回
|
||||
return result
|
||||
|
||||
# 执行系统模块
|
||||
return self.__execute_system_modules(method, result, *args, **kwargs)
|
||||
|
||||
async def async_run_module(self, method: str, *args, **kwargs) -> Any:
|
||||
"""
|
||||
异步运行包含该方法的所有模块,然后返回结果
|
||||
当kwargs包含命名参数raise_exception时,如模块方法抛出异常且raise_exception为True,则同步抛出异常
|
||||
支持异步和同步方法的混合调用
|
||||
"""
|
||||
result = None
|
||||
|
||||
# 执行插件模块
|
||||
result = await self.__async_execute_plugin_modules(method, result, *args, **kwargs)
|
||||
|
||||
if not self.__is_valid_empty(result) and not isinstance(result, list):
|
||||
# 插件模块返回结果不为空且不是列表,直接返回
|
||||
return result
|
||||
|
||||
# 执行系统模块
|
||||
return await self.__async_execute_system_modules(method, result, *args, **kwargs)
|
||||
|
||||
def recognize_media(self, meta: MetaBase = None,
|
||||
mtype: Optional[MediaType] = None,
|
||||
tmdbid: Optional[int] = None,
|
||||
|
||||
@@ -2066,8 +2066,8 @@ class TmdbApi:
|
||||
logger.error(str(e))
|
||||
return []
|
||||
|
||||
async def async_get_movie_credits(self, tmdbid: int, page: Optional[int] = 1, count: Optional[int] = 24) -> List[
|
||||
dict]:
|
||||
async def async_get_movie_credits(self, tmdbid: int,
|
||||
page: Optional[int] = 1, count: Optional[int] = 24) -> List[dict]:
|
||||
"""
|
||||
获取电影的演职员列表(异步版本)
|
||||
"""
|
||||
@@ -2150,8 +2150,8 @@ class TmdbApi:
|
||||
logger.error(str(e))
|
||||
return {}
|
||||
|
||||
async def async_get_person_credits(self, person_id: int, page: Optional[int] = 1, count: Optional[int] = 24) -> \
|
||||
List[dict]:
|
||||
async def async_get_person_credits(self, person_id: int,
|
||||
page: Optional[int] = 1, count: Optional[int] = 24) -> List[dict]:
|
||||
"""
|
||||
获取人物参演作品(异步版本)
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user