diff --git a/app/modules/filemanager/transhandler.py b/app/modules/filemanager/transhandler.py index f618c101..b40ca3f9 100644 --- a/app/modules/filemanager/transhandler.py +++ b/app/modules/filemanager/transhandler.py @@ -1,8 +1,4 @@ -import multiprocessing -import os import re -import shutil -import sys from pathlib import Path from threading import Lock from typing import Optional, List, Tuple @@ -19,9 +15,9 @@ from app.helper.message import TemplateHelper from app.log import logger from app.modules.filemanager.storages import StorageBase from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData -from app.schemas import TransferRenameEventData from app.schemas.types import MediaType, ChainEventType from app.utils.system import SystemUtils +from app.schemas import TransferRenameEventData lock = Lock() @@ -243,8 +239,7 @@ class TransHandler: overflag = True if not overflag: # 目标文件已存在 - logger.info( - f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") + logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}") if overwrite_mode == 'always': # 总是覆盖同名文件 overflag = True @@ -328,118 +323,127 @@ class TransHandler: :param target_file: 目标文件路径 :param transfer_type: 整理方式 """ + + def __get_targetitem(_path: Path) -> FileItem: + """ + 获取文件信息 + """ + return FileItem( + storage=target_storage, + path=_path.as_posix(), + name=_path.name, + basename=_path.stem, + type="file", + size=_path.stat().st_size, + extension=_path.suffix.lstrip('.'), + modify_time=_path.stat().st_mtime + ) + if (fileitem.storage != target_storage and fileitem.storage != "local" and target_storage != "local"): return None, f"不支持 {fileitem.storage} 到 {target_storage} 的文件整理" - def _do_transfer() -> Tuple[bool, str]: - """ - 执行文件整理 - """ - try: - if fileitem.storage == "local" and target_storage == "local": - if not target_file.parent.exists(): - target_file.parent.mkdir(parents=True) - if transfer_type == "copy": - shutil.copy2(fileitem.path, target_file.as_posix()) - elif transfer_type == "move": - SystemUtils.move(Path(fileitem.path), target_file) - elif transfer_type == "link": - os.link(fileitem.path, target_file) - elif transfer_type == "softlink": - os.symlink(fileitem.path, target_file) - else: - return False, f"不支持的整理方式:{transfer_type}" - return True, "" - elif fileitem.storage == "local" and target_storage != "local": - filepath = Path(fileitem.path) - if not filepath.exists(): - return False, f"文件 {filepath} 不存在" - if transfer_type in ["copy", "move"]: - target_fileitem = target_oper.get_folder(target_file.parent) - if not target_fileitem: - return False, f"【{target_storage}】{target_file.parent} 目录获取失败" + # 加锁 + with lock: + if fileitem.storage == "local" and target_storage == "local": + # 创建目录 + if not target_file.parent.exists(): + target_file.parent.mkdir(parents=True) + # 本地到本地 + if transfer_type == "copy": + state = source_oper.copy(fileitem, target_file.parent, target_file.name) + elif transfer_type == "move": + state = source_oper.move(fileitem, target_file.parent, target_file.name) + elif transfer_type == "link": + state = source_oper.link(fileitem, target_file) + elif transfer_type == "softlink": + state = source_oper.softlink(fileitem, target_file) + else: + return None, f"不支持的整理方式:{transfer_type}" + if state: + return __get_targetitem(target_file), "" + else: + return None, f"{fileitem.path} {transfer_type} 失败" + elif fileitem.storage == "local" and target_storage != "local": + # 本地到网盘 + filepath = Path(fileitem.path) + if not filepath.exists(): + return None, f"文件 {filepath} 不存在" + if transfer_type == "copy": + # 复制 + # 根据目的路径创建文件夹 + target_fileitem = target_oper.get_folder(target_file.parent) + if target_fileitem: + # 上传文件 new_item = target_oper.upload(target_fileitem, filepath, target_file.name) - if not new_item: - return False, f"{fileitem.path} 上传 {target_storage} 失败" - if transfer_type == "move": - source_oper.delete(fileitem) - return True, "" + if new_item: + return new_item, "" + else: + return None, f"{fileitem.path} 上传 {target_storage} 失败" else: - return False, f"不支持的整理方式:{transfer_type}" - elif fileitem.storage != "local" and target_storage == "local": - if target_file.exists(): - logger.warn(f"文件已存在:{target_file}") - return True, "" - if transfer_type in ["copy", "move"]: - tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent) - if not tmp_file: - return False, f"{fileitem.path} {fileitem.storage} 下载失败" + return None, f"【{target_storage}】{target_file.parent} 目录获取失败" + elif transfer_type == "move": + # 移动 + # 根据目的路径获取文件夹 + target_fileitem = target_oper.get_folder(target_file.parent) + if target_fileitem: + # 上传文件 + new_item = target_oper.upload(target_fileitem, filepath, target_file.name) + if new_item: + # 删除源文件 + source_oper.delete(fileitem) + return new_item, "" + else: + return None, f"{fileitem.path} 上传 {target_storage} 失败" + else: + return None, f"【{target_storage}】{target_file.parent} 目录获取失败" + elif fileitem.storage != "local" and target_storage == "local": + # 网盘到本地 + if target_file.exists(): + logger.warn(f"文件已存在:{target_file}") + return __get_targetitem(target_file), "" + # 网盘到本地 + if transfer_type in ["copy", "move"]: + # 下载 + tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent) + if tmp_file: + # 创建目录 if not target_file.parent.exists(): target_file.parent.mkdir(parents=True) + # 将tmp_file移动后target_file SystemUtils.move(tmp_file, target_file) if transfer_type == "move": + # 删除源文件 source_oper.delete(fileitem) - return True, "" + return __get_targetitem(target_file), "" else: - return False, f"不支持的整理方式:{transfer_type}" - elif fileitem.storage == target_storage: + return None, f"{fileitem.path} {fileitem.storage} 下载失败" + elif fileitem.storage == target_storage: + # 同一网盘 + if transfer_type == "copy": + # 复制文件到新目录 target_fileitem = target_oper.get_folder(target_file.parent) - if not target_fileitem: - return False, f"【{target_storage}】{target_file.parent} 目录获取失败" - if transfer_type in ["copy", "move"]: - if not source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): - return False, f"【{target_storage}】{fileitem.path} {'复制' if transfer_type == 'copy' else '移动'}文件失败" - return True, "" + if target_fileitem: + if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): + return target_oper.get_item(target_file), "" + else: + return None, f"【{target_storage}】{fileitem.path} 复制文件失败" else: - return False, f"不支持的整理方式:{transfer_type}" + return None, f"【{target_storage}】{target_file.parent} 目录获取失败" + elif transfer_type == "move": + # 移动文件到新目录 + target_fileitem = target_oper.get_folder(target_file.parent) + if target_fileitem: + if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name): + return target_oper.get_item(target_file), "" + else: + return None, f"【{target_storage}】{fileitem.path} 移动文件失败" + else: + return None, f"【{target_storage}】{target_file.parent} 目录获取失败" else: - return False, "未知错误" - except Exception as _e: - return False, str(_e) + return None, f"不支持的整理方式:{transfer_type}" - def _worker_action(): - """ - 子进程执行函数 - """ - _success, _msg = _do_transfer() - if _success: - sys.exit(0) - logger.error(f"子进程执行失败:{_msg}") - sys.exit(10) - - # 在新进程中执行以降低内存占用 - try: - ctx = multiprocessing.get_context("fork") - except ValueError: - ctx = None - - if ctx is not None: - # 新进程执行 - logger.info(f"在新进程中整理文件 {fileitem.path}") - proc = ctx.Process(target=_worker_action) - # 加锁以避免与当前进程内其他并发调用发生竞争 - with lock: - proc.start() - proc.join() - - if proc.exitcode == 0: - # 成功 - item = target_oper.get_item(target_file) - if item: - return item, "" - return None, f"【{target_storage}】{target_file} 获取失败" - else: - return None, f"{fileitem.path} {transfer_type} 失败" - else: - # 当前进程执行 - success, errmsg = _do_transfer() - if success: - item = target_oper.get_item(target_file) - if item: - return item, "" - return None, f"【{target_storage}】{target_file} 获取失败" - return None, errmsg + return None, "未知错误" def __transfer_other_files(self, fileitem: FileItem, target_storage: str, source_oper: StorageBase, target_oper: StorageBase,