From 4f304a70b76d7f00a206f014e77d4c2c875e1ac3 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 8 Sep 2025 16:59:29 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=9C=A8=E5=AD=90=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E4=B8=AD=E6=93=8D=E4=BD=9C=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/filemanager/transhandler.py | 206 ++++++++++++------------ 1 file changed, 101 insertions(+), 105 deletions(-) diff --git a/app/modules/filemanager/transhandler.py b/app/modules/filemanager/transhandler.py index b40ca3f9..f618c101 100644 --- a/app/modules/filemanager/transhandler.py +++ b/app/modules/filemanager/transhandler.py @@ -1,4 +1,8 @@ +import multiprocessing +import os import re +import shutil +import sys from pathlib import Path from threading import Lock from typing import Optional, List, Tuple @@ -15,9 +19,9 @@ from app.helper.message import TemplateHelper from app.log import logger from app.modules.filemanager.storages import StorageBase from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData +from app.schemas import TransferRenameEventData from app.schemas.types import MediaType, ChainEventType from app.utils.system import SystemUtils -from app.schemas import TransferRenameEventData lock = Lock() @@ -239,7 +243,8 @@ class TransHandler: overflag = True if not overflag: # 目标文件已存在 - logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") + logger.info( + f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") if overwrite_mode == 'always': # 总是覆盖同名文件 overflag = True @@ -323,127 +328,118 @@ class TransHandler: :param target_file: 目标文件路径 :param transfer_type: 整理方式 """ - - def __get_targetitem(_path: Path) -> FileItem: - """ - 获取文件信息 - """ - return FileItem( - storage=target_storage, - path=_path.as_posix(), - name=_path.name, - basename=_path.stem, - type="file", - size=_path.stat().st_size, - extension=_path.suffix.lstrip('.'), - modify_time=_path.stat().st_mtime - ) - if (fileitem.storage != target_storage and fileitem.storage != "local" and target_storage != "local"): return None, f"不支持 {fileitem.storage} 到 {target_storage} 的文件整理" - # 加锁 - with lock: - if fileitem.storage == "local" and target_storage == "local": - # 创建目录 - if not target_file.parent.exists(): - target_file.parent.mkdir(parents=True) - # 本地到本地 - if transfer_type == "copy": - state = source_oper.copy(fileitem, target_file.parent, target_file.name) - elif transfer_type == "move": - state = source_oper.move(fileitem, target_file.parent, target_file.name) - elif transfer_type == "link": - state = source_oper.link(fileitem, target_file) - elif transfer_type == "softlink": - state = source_oper.softlink(fileitem, target_file) - else: - return None, f"不支持的整理方式:{transfer_type}" - if state: - return __get_targetitem(target_file), "" - else: - return None, f"{fileitem.path} {transfer_type} 失败" - elif fileitem.storage == "local" and target_storage != "local": - # 本地到网盘 - filepath = Path(fileitem.path) - if not filepath.exists(): - return None, f"文件 {filepath} 不存在" - if transfer_type == "copy": - # 复制 - # 根据目的路径创建文件夹 - target_fileitem = target_oper.get_folder(target_file.parent) - if target_fileitem: - # 上传文件 - new_item = target_oper.upload(target_fileitem, filepath, target_file.name) - if new_item: - return new_item, "" - else: - return None, f"{fileitem.path} 上传 {target_storage} 失败" + def _do_transfer() -> Tuple[bool, str]: + """ + 执行文件整理 + """ + try: + if fileitem.storage == "local" and target_storage == "local": + if not target_file.parent.exists(): + target_file.parent.mkdir(parents=True) + if transfer_type == "copy": + shutil.copy2(fileitem.path, target_file.as_posix()) + elif transfer_type == "move": + SystemUtils.move(Path(fileitem.path), target_file) + elif transfer_type == "link": + os.link(fileitem.path, target_file) + elif transfer_type == "softlink": + os.symlink(fileitem.path, target_file) else: - return None, f"【{target_storage}】{target_file.parent} 目录获取失败" - elif transfer_type == "move": - # 移动 - # 根据目的路径获取文件夹 - target_fileitem = target_oper.get_folder(target_file.parent) - if target_fileitem: - # 上传文件 + return False, f"不支持的整理方式:{transfer_type}" + return True, "" + elif fileitem.storage == "local" and target_storage != "local": + filepath = Path(fileitem.path) + if not filepath.exists(): + return False, f"文件 {filepath} 不存在" + if transfer_type in ["copy", "move"]: + target_fileitem = target_oper.get_folder(target_file.parent) + if not target_fileitem: + return False, f"【{target_storage}】{target_file.parent} 目录获取失败" new_item = target_oper.upload(target_fileitem, filepath, target_file.name) - if new_item: - # 删除源文件 + if not new_item: + return False, f"{fileitem.path} 上传 {target_storage} 失败" + if transfer_type == "move": source_oper.delete(fileitem) - return new_item, "" - else: - return None, f"{fileitem.path} 上传 {target_storage} 失败" + return True, "" else: - return None, f"【{target_storage}】{target_file.parent} 目录获取失败" - elif fileitem.storage != "local" and target_storage == "local": - # 网盘到本地 - if target_file.exists(): - logger.warn(f"文件已存在:{target_file}") - return __get_targetitem(target_file), "" - # 网盘到本地 - if transfer_type in ["copy", "move"]: - # 下载 - tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent) - if tmp_file: - # 创建目录 + return False, f"不支持的整理方式:{transfer_type}" + elif fileitem.storage != "local" and target_storage == "local": + if target_file.exists(): + logger.warn(f"文件已存在:{target_file}") + return True, "" + if transfer_type in ["copy", "move"]: + tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent) + if not tmp_file: + return False, f"{fileitem.path} {fileitem.storage} 下载失败" if not target_file.parent.exists(): target_file.parent.mkdir(parents=True) - # 将tmp_file移动后target_file SystemUtils.move(tmp_file, target_file) if transfer_type == "move": - # 删除源文件 source_oper.delete(fileitem) - return __get_targetitem(target_file), "" + return True, "" else: - return None, f"{fileitem.path} {fileitem.storage} 下载失败" - elif fileitem.storage == target_storage: - # 同一网盘 - if transfer_type == "copy": - # 复制文件到新目录 + return False, f"不支持的整理方式:{transfer_type}" + elif fileitem.storage == target_storage: target_fileitem = target_oper.get_folder(target_file.parent) - if target_fileitem: - if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): - return target_oper.get_item(target_file), "" - else: - return None, f"【{target_storage}】{fileitem.path} 复制文件失败" + if not target_fileitem: + return False, f"【{target_storage}】{target_file.parent} 目录获取失败" + if transfer_type in ["copy", "move"]: + if not source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): + return False, f"【{target_storage}】{fileitem.path} {'复制' if transfer_type == 'copy' else '移动'}文件失败" + return True, "" else: - return None, f"【{target_storage}】{target_file.parent} 目录获取失败" - elif transfer_type == "move": - # 移动文件到新目录 - target_fileitem = target_oper.get_folder(target_file.parent) - if target_fileitem: - if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): - return target_oper.get_item(target_file), "" - else: - return None, f"【{target_storage}】{fileitem.path} 移动文件失败" - else: - return None, f"【{target_storage}】{target_file.parent} 目录获取失败" + return False, f"不支持的整理方式:{transfer_type}" else: - return None, f"不支持的整理方式:{transfer_type}" + return False, "未知错误" + except Exception as _e: + return False, str(_e) - return None, "未知错误" + def _worker_action(): + """ + 子进程执行函数 + """ + _success, _msg = _do_transfer() + if _success: + sys.exit(0) + logger.error(f"子进程执行失败:{_msg}") + sys.exit(10) + + # 在新进程中执行以降低内存占用 + try: + ctx = multiprocessing.get_context("fork") + except ValueError: + ctx = None + + if ctx is not None: + # 新进程执行 + logger.info(f"在新进程中整理文件 {fileitem.path}") + proc = ctx.Process(target=_worker_action) + # 加锁以避免与当前进程内其他并发调用发生竞争 + with lock: + proc.start() + proc.join() + + if proc.exitcode == 0: + # 成功 + item = target_oper.get_item(target_file) + if item: + return item, "" + return None, f"【{target_storage}】{target_file} 获取失败" + else: + return None, f"{fileitem.path} {transfer_type} 失败" + else: + # 当前进程执行 + success, errmsg = _do_transfer() + if success: + item = target_oper.get_item(target_file) + if item: + return item, "" + return None, f"【{target_storage}】{target_file} 获取失败" + return None, errmsg def __transfer_other_files(self, fileitem: FileItem, target_storage: str, source_oper: StorageBase, target_oper: StorageBase,