feat:在fork进程中执行文件整理

This commit is contained in:
jxxghp
2025-09-13 08:32:31 +08:00
parent 188de34306
commit a123ff2c04

View File

@@ -1,3 +1,5 @@
import multiprocessing
import os
import re
from pathlib import Path
from threading import Lock
@@ -12,16 +14,94 @@ from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath
from app.helper.directory import DirectoryHelper
from app.helper.message import TemplateHelper
from app.helper.module import ModuleHelper
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData
from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData, \
TransferRenameEventData
from app.schemas.types import MediaType, ChainEventType
from app.utils.system import SystemUtils
from app.schemas import TransferRenameEventData
lock = Lock()
def _transfer_command_worker(args):
"""
在子进程中执行文件转移命令的工作函数
:param args: 包含所有必要参数的元组
"""
# 解包参数
(fileitem_dict, target_storage, target_file_str, transfer_type, result_queue) = args
# 重新创建存储操作对象
storage_schemas = ModuleHelper.load('app.modules.filemanager.storages',
filter_func=lambda _, obj: hasattr(obj, 'schema') and obj.schema)
def __get_storage_oper(_storage: str, _func: Optional[str] = None) -> Optional[StorageBase]:
"""
获取存储操作对象
"""
for storage_schema in storage_schemas:
if storage_schema.schema \
and storage_schema.schema.value == _storage \
and (not _func or hasattr(storage_schema, _func)):
return storage_schema()
return None
try:
# 重新创建 FileItem 对象
fileitem = FileItem(**fileitem_dict)
target_file = Path(target_file_str)
# 获取存储操作对象
source_oper = __get_storage_oper(fileitem.storage)
target_oper = __get_storage_oper(target_storage)
if not source_oper or not target_oper:
result_queue.put((None, f"无法创建存储操作对象: source={fileitem.storage}, target={target_storage}"))
return
# 执行原有的转移逻辑
result = TransHandler.execute_transfer_command(
fileitem, target_storage, source_oper, target_oper,
target_file, transfer_type
)
# 将结果放入队列
result_queue.put(result)
except Exception as e:
result_queue.put((None, str(e)))
def _supports_fork():
"""
检测当前系统是否支持 fork
:return: True 如果支持 forkFalse 否则
"""
try:
# Windows 不支持 fork
if os.name == 'nt':
return False
# 检查可用的启动方法
available_methods = multiprocessing.get_all_start_methods()
if 'fork' not in available_methods:
return False
# 尝试设置 fork 方法
try:
multiprocessing.set_start_method('fork', force=True)
return True
except RuntimeError:
# 如果已经设置过启动方法,检查当前方法
current_method = multiprocessing.get_start_method()
return current_method == 'fork'
except Exception as e:
logger.error(f"检测 fork 支持时出错: {str(e)}")
return False
class TransHandler:
"""
文件转移整理类
@@ -239,7 +319,8 @@ class TransHandler:
overflag = True
if not overflag:
# 目标文件已存在
logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
logger.info(
f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
@@ -315,6 +396,80 @@ class TransHandler:
target_file: Path, transfer_type: str,
) -> Tuple[Optional[FileItem], str]:
"""
处理单个文件,支持在 fork 进程中运行以提高内存回收
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标文件路径
:param transfer_type: 整理方式
"""
# 检查是否支持 fork
if _supports_fork():
logger.info(f"[PID:{os.getpid()}] 在新的进程中整理文件: {fileitem.path}")
# 创建队列用于进程间通信
result_queue = multiprocessing.Queue()
# 准备参数
fileitem_dict = {
'storage': fileitem.storage,
'path': fileitem.path,
'name': fileitem.name,
'basename': fileitem.basename,
'type': fileitem.type,
'size': fileitem.size,
'extension': fileitem.extension,
'modify_time': fileitem.modify_time
}
args = (
fileitem_dict,
target_storage,
str(target_file),
transfer_type,
result_queue
)
# 创建进程
process = multiprocessing.Process(
target=_transfer_command_worker,
args=(args,)
)
# 启动进程
process.start()
# 等待进程完成
process.join()
# 检查进程是否正常退出
if process.exitcode != 0:
logger.error(f"文件转移进程异常退出,退出码: {process.exitcode}")
return None, f"文件转移进程异常退出"
# 从队列获取结果
try:
result = result_queue.get(timeout=1)
logger.debug(f"文件转移在子进程中完成: {fileitem.path}")
return result
except Exception as e:
logger.error(f"无法从子进程获取结果:{str(e)}")
return None, f"无法从子进程获取结果:{str(e)}"
else:
# 不支持 fork直接在当前线程执行
logger.debug("当前系统不支持 fork在当前线程中执行文件转移")
return TransHandler.execute_transfer_command(
fileitem, target_storage, source_oper, target_oper,
target_file, transfer_type
)
@staticmethod
def execute_transfer_command(fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str,
) -> Tuple[Optional[FileItem], str]:
"""
处理单个文件
:param fileitem: 源文件
:param target_storage: 目标存储