diff --git a/app/modules/filemanager/transhandler.py b/app/modules/filemanager/transhandler.py index b40ca3f9..23f27421 100644 --- a/app/modules/filemanager/transhandler.py +++ b/app/modules/filemanager/transhandler.py @@ -1,3 +1,5 @@ +import multiprocessing +import os import re from pathlib import Path from threading import Lock @@ -12,16 +14,94 @@ from app.core.meta import MetaBase from app.core.metainfo import MetaInfoPath from app.helper.directory import DirectoryHelper from app.helper.message import TemplateHelper +from app.helper.module import ModuleHelper from app.log import logger from app.modules.filemanager.storages import StorageBase -from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData +from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData, \ + TransferRenameEventData from app.schemas.types import MediaType, ChainEventType from app.utils.system import SystemUtils -from app.schemas import TransferRenameEventData lock = Lock() +def _transfer_command_worker(args): + """ + 在子进程中执行文件转移命令的工作函数 + :param args: 包含所有必要参数的元组 + """ + # 解包参数 + (fileitem_dict, target_storage, target_file_str, transfer_type, result_queue) = args + + # 重新创建存储操作对象 + storage_schemas = ModuleHelper.load('app.modules.filemanager.storages', + filter_func=lambda _, obj: hasattr(obj, 'schema') and obj.schema) + + def __get_storage_oper(_storage: str, _func: Optional[str] = None) -> Optional[StorageBase]: + """ + 获取存储操作对象 + """ + for storage_schema in storage_schemas: + if storage_schema.schema \ + and storage_schema.schema.value == _storage \ + and (not _func or hasattr(storage_schema, _func)): + return storage_schema() + return None + + try: + # 重新创建 FileItem 对象 + fileitem = FileItem(**fileitem_dict) + target_file = Path(target_file_str) + + # 获取存储操作对象 + source_oper = __get_storage_oper(fileitem.storage) + target_oper = __get_storage_oper(target_storage) + + if not source_oper or not target_oper: + result_queue.put((None, f"无法创建存储操作对象: source={fileitem.storage}, target={target_storage}")) + return + + # 执行原有的转移逻辑 + result = TransHandler.execute_transfer_command( + fileitem, target_storage, source_oper, target_oper, + target_file, transfer_type + ) + + # 将结果放入队列 + result_queue.put(result) + + except Exception as e: + result_queue.put((None, str(e))) + + +def _supports_fork(): + """ + 检测当前系统是否支持 fork + :return: True 如果支持 fork,False 否则 + """ + try: + # Windows 不支持 fork + if os.name == 'nt': + return False + + # 检查可用的启动方法 + available_methods = multiprocessing.get_all_start_methods() + if 'fork' not in available_methods: + return False + + # 尝试设置 fork 方法 + try: + multiprocessing.set_start_method('fork', force=True) + return True + except RuntimeError: + # 如果已经设置过启动方法,检查当前方法 + current_method = multiprocessing.get_start_method() + return current_method == 'fork' + except Exception as e: + logger.error(f"检测 fork 支持时出错: {str(e)}") + return False + + class TransHandler: """ 文件转移整理类 @@ -239,7 +319,8 @@ class TransHandler: overflag = True if not overflag: # 目标文件已存在 - logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") + logger.info( + f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") if overwrite_mode == 'always': # 总是覆盖同名文件 overflag = True @@ -315,6 +396,80 @@ class TransHandler: target_file: Path, transfer_type: str, ) -> Tuple[Optional[FileItem], str]: """ + 处理单个文件,支持在 fork 进程中运行以提高内存回收 + :param fileitem: 源文件 + :param target_storage: 目标存储 + :param source_oper: 源存储操作对象 + :param target_oper: 目标存储操作对象 + :param target_file: 目标文件路径 + :param transfer_type: 整理方式 + """ + # 检查是否支持 fork + if _supports_fork(): + logger.info(f"[PID:{os.getpid()}] 在新的进程中整理文件: {fileitem.path}") + + # 创建队列用于进程间通信 + result_queue = multiprocessing.Queue() + + # 准备参数 + fileitem_dict = { + 'storage': fileitem.storage, + 'path': fileitem.path, + 'name': fileitem.name, + 'basename': fileitem.basename, + 'type': fileitem.type, + 'size': fileitem.size, + 'extension': fileitem.extension, + 'modify_time': fileitem.modify_time + } + + args = ( + fileitem_dict, + target_storage, + str(target_file), + transfer_type, + result_queue + ) + + # 创建进程 + process = multiprocessing.Process( + target=_transfer_command_worker, + args=(args,) + ) + + # 启动进程 + process.start() + + # 等待进程完成 + process.join() + + # 检查进程是否正常退出 + if process.exitcode != 0: + logger.error(f"文件转移进程异常退出,退出码: {process.exitcode}") + return None, f"文件转移进程异常退出" + + # 从队列获取结果 + try: + result = result_queue.get(timeout=1) + logger.debug(f"文件转移在子进程中完成: {fileitem.path}") + return result + except Exception as e: + logger.error(f"无法从子进程获取结果:{str(e)}") + return None, f"无法从子进程获取结果:{str(e)}" + else: + # 不支持 fork,直接在当前线程执行 + logger.debug("当前系统不支持 fork,在当前线程中执行文件转移") + return TransHandler.execute_transfer_command( + fileitem, target_storage, source_oper, target_oper, + target_file, transfer_type + ) + + @staticmethod + def execute_transfer_command(fileitem: FileItem, target_storage: str, + source_oper: StorageBase, target_oper: StorageBase, + target_file: Path, transfer_type: str, + ) -> Tuple[Optional[FileItem], str]: + """ 处理单个文件 :param fileitem: 源文件 :param target_storage: 目标存储