mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-11 06:26:46 +08:00
122 lines
4.9 KiB
Python
122 lines
4.9 KiB
Python
# -*- coding: utf-8 -*-
|
||
import importlib
|
||
import pkgutil
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import List, Any, Callable
|
||
|
||
from app.log import logger
|
||
|
||
FilterFuncType = Callable[[str, Any], bool]
|
||
|
||
|
||
def _default_filter(name: str, obj: Any) -> bool:
|
||
"""
|
||
默认过滤器
|
||
"""
|
||
return True if name and obj else False
|
||
|
||
|
||
class ModuleHelper:
|
||
"""
|
||
模块动态加载
|
||
"""
|
||
|
||
@classmethod
|
||
def load(cls, package_path: str, filter_func: FilterFuncType = _default_filter) -> List[Any]:
|
||
"""
|
||
导入模块
|
||
:param package_path: 父包名
|
||
:param filter_func: 子模块过滤函数,入参为模块名和模块对象,返回True则导入,否则不导入
|
||
:return: 导入的模块对象列表
|
||
"""
|
||
|
||
submodules: list = []
|
||
loaded_modules = set()
|
||
packages = importlib.import_module(package_path)
|
||
for importer, package_name, _ in pkgutil.iter_modules(packages.__path__):
|
||
try:
|
||
if package_name.startswith('_'):
|
||
continue
|
||
full_package_name = f'{package_path}.{package_name}'
|
||
module = importlib.import_module(full_package_name)
|
||
importlib.reload(module)
|
||
for name, obj in module.__dict__.items():
|
||
if name.startswith('_'):
|
||
continue
|
||
if isinstance(obj, type) and filter_func(name, obj):
|
||
if name in loaded_modules:
|
||
continue
|
||
loaded_modules.add(name)
|
||
submodules.append(obj)
|
||
except Exception as err:
|
||
logger.debug(f'加载模块 {package_name} 失败:{str(err)} - {traceback.format_exc()}')
|
||
|
||
return submodules
|
||
|
||
@classmethod
|
||
def load_with_pre_filter(cls, package_path: str, filter_func: FilterFuncType = _default_filter) -> List[Any]:
|
||
"""
|
||
导入子模块
|
||
:param package_path: 父包名
|
||
:param filter_func: 子模块过滤函数,入参为模块名和模块对象,返回True则导入,否则不导入
|
||
:return: 导入的模块对象列表
|
||
"""
|
||
|
||
submodules: list = []
|
||
packages = importlib.import_module(package_path)
|
||
|
||
def reload_module_objects(target_module):
|
||
"""加载模块并返回对象"""
|
||
importlib.reload(target_module)
|
||
# reload后,重新过滤已经重新加载后的模块中的对象
|
||
return [
|
||
obj for name, obj in target_module.__dict__.items()
|
||
if not name.startswith('_') and isinstance(obj, type) and filter_func(name, obj)
|
||
]
|
||
|
||
def reload_sub_modules(parent_module, parent_module_name):
|
||
"""重新加载一级子模块"""
|
||
for sub_importer, sub_module_name, sub_is_pkg in pkgutil.walk_packages(parent_module.__path__,
|
||
parent_module_name + '.'):
|
||
try:
|
||
full_sub_module = importlib.import_module(sub_module_name)
|
||
importlib.reload(full_sub_module)
|
||
except Exception as sub_err:
|
||
logger.debug(f'加载子模块 {sub_module_name} 失败:{str(sub_err)} - {traceback.format_exc()}')
|
||
|
||
# 遍历包中的所有子模块
|
||
for importer, package_name, is_pkg in pkgutil.iter_modules(packages.__path__):
|
||
if package_name.startswith('_'):
|
||
continue
|
||
full_package_name = f'{package_path}.{package_name}'
|
||
try:
|
||
module = importlib.import_module(full_package_name)
|
||
# 预检查模块中的对象
|
||
candidates = [(name, obj) for name, obj in module.__dict__.items() if
|
||
not name.startswith('_') and isinstance(obj, type)]
|
||
# 确定是否需要重新加载
|
||
if any(filter_func(name, obj) for name, obj in candidates):
|
||
# 如果子模块是包,重新加载其子模块
|
||
if is_pkg:
|
||
reload_sub_modules(module, full_package_name)
|
||
submodules.extend(reload_module_objects(module))
|
||
except Exception as err:
|
||
logger.debug(f'加载模块 {package_name} 失败:{str(err)} - {traceback.format_exc()}')
|
||
|
||
return submodules
|
||
|
||
@staticmethod
|
||
def dynamic_import_all_modules(base_path: Path, package_name: str):
|
||
"""
|
||
动态导入目录下所有模块
|
||
"""
|
||
modules = []
|
||
# 遍历文件夹,找到所有模块文件
|
||
for file in base_path.glob("*.py"):
|
||
file_name = file.stem
|
||
if file_name != "__init__":
|
||
modules.append(file_name)
|
||
full_module_name = f"{package_name}.{file_name}"
|
||
importlib.import_module(full_module_name)
|