feat:在子进程中操作文件

This commit is contained in:
jxxghp
2025-09-08 16:59:29 +08:00
parent 59a54d4f04
commit 4f304a70b7

View File

@@ -1,4 +1,8 @@
import multiprocessing
import os
import re
import shutil
import sys
from pathlib import Path
from threading import Lock
from typing import Optional, List, Tuple
@@ -15,9 +19,9 @@ from app.helper.message import TemplateHelper
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileItem, TransferInterceptEventData
from app.schemas import TransferRenameEventData
from app.schemas.types import MediaType, ChainEventType
from app.utils.system import SystemUtils
from app.schemas import TransferRenameEventData
lock = Lock()
@@ -239,7 +243,8 @@ class TransHandler:
overflag = True
if not overflag:
# 目标文件已存在
logger.info(f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
logger.info(
f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
@@ -323,127 +328,118 @@ class TransHandler:
:param target_file: 目标文件路径
:param transfer_type: 整理方式
"""
def __get_targetitem(_path: Path) -> FileItem:
"""
获取文件信息
"""
return FileItem(
storage=target_storage,
path=_path.as_posix(),
name=_path.name,
basename=_path.stem,
type="file",
size=_path.stat().st_size,
extension=_path.suffix.lstrip('.'),
modify_time=_path.stat().st_mtime
)
if (fileitem.storage != target_storage
and fileitem.storage != "local" and target_storage != "local"):
return None, f"不支持 {fileitem.storage}{target_storage} 的文件整理"
# 加锁
with lock:
if fileitem.storage == "local" and target_storage == "local":
# 创建目录
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True)
# 本地到本地
if transfer_type == "copy":
state = source_oper.copy(fileitem, target_file.parent, target_file.name)
elif transfer_type == "move":
state = source_oper.move(fileitem, target_file.parent, target_file.name)
elif transfer_type == "link":
state = source_oper.link(fileitem, target_file)
elif transfer_type == "softlink":
state = source_oper.softlink(fileitem, target_file)
else:
return None, f"不支持的整理方式:{transfer_type}"
if state:
return __get_targetitem(target_file), ""
else:
return None, f"{fileitem.path} {transfer_type} 失败"
elif fileitem.storage == "local" and target_storage != "local":
# 本地到网盘
filepath = Path(fileitem.path)
if not filepath.exists():
return None, f"文件 {filepath} 不存在"
if transfer_type == "copy":
# 复制
# 根据目的路径创建文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
def _do_transfer() -> Tuple[bool, str]:
"""
执行文件整理
"""
try:
if fileitem.storage == "local" and target_storage == "local":
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True)
if transfer_type == "copy":
shutil.copy2(fileitem.path, target_file.as_posix())
elif transfer_type == "move":
SystemUtils.move(Path(fileitem.path), target_file)
elif transfer_type == "link":
os.link(fileitem.path, target_file)
elif transfer_type == "softlink":
os.symlink(fileitem.path, target_file)
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动
# 根据目的路径获取文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
return False, f"不支持的整理方式:{transfer_type}"
return True, ""
elif fileitem.storage == "local" and target_storage != "local":
filepath = Path(fileitem.path)
if not filepath.exists():
return False, f"文件 {filepath} 不存在"
if transfer_type in ["copy", "move"]:
target_fileitem = target_oper.get_folder(target_file.parent)
if not target_fileitem:
return False, f"{target_storage}{target_file.parent} 目录获取失败"
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
# 删除源文件
if not new_item:
return False, f"{fileitem.path} 上传 {target_storage} 失败"
if transfer_type == "move":
source_oper.delete(fileitem)
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
return True, ""
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif fileitem.storage != "local" and target_storage == "local":
# 网盘到本地
if target_file.exists():
logger.warn(f"文件已存在:{target_file}")
return __get_targetitem(target_file), ""
# 网盘到本地
if transfer_type in ["copy", "move"]:
# 下载
tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent)
if tmp_file:
# 创建目录
return False, f"不支持的整理方式:{transfer_type}"
elif fileitem.storage != "local" and target_storage == "local":
if target_file.exists():
logger.warn(f"文件已存在:{target_file}")
return True, ""
if transfer_type in ["copy", "move"]:
tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent)
if not tmp_file:
return False, f"{fileitem.path} {fileitem.storage} 下载失败"
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True)
# 将tmp_file移动后target_file
SystemUtils.move(tmp_file, target_file)
if transfer_type == "move":
# 删除源文件
source_oper.delete(fileitem)
return __get_targetitem(target_file), ""
return True, ""
else:
return None, f"{fileitem.path} {fileitem.storage} 下载失败"
elif fileitem.storage == target_storage:
# 同一网盘
if transfer_type == "copy":
# 复制文件到新目录
return False, f"不支持的整理方式:{transfer_type}"
elif fileitem.storage == target_storage:
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 复制文件失败"
if not target_fileitem:
return False, f"{target_storage}{target_file.parent} 目录获取失败"
if transfer_type in ["copy", "move"]:
if not source_oper.move(fileitem, Path(target_fileitem.path), target_file.name):
return False, f"{target_storage}{fileitem.path} {'复制' if transfer_type == 'copy' else '移动'}文件失败"
return True, ""
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动文件到新目录
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 移动文件失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
return False, f"不支持的整理方式:{transfer_type}"
else:
return None, f"不支持的整理方式:{transfer_type}"
return False, "未知错误"
except Exception as _e:
return False, str(_e)
return None, "未知错误"
def _worker_action():
"""
子进程执行函数
"""
_success, _msg = _do_transfer()
if _success:
sys.exit(0)
logger.error(f"子进程执行失败:{_msg}")
sys.exit(10)
# 在新进程中执行以降低内存占用
try:
ctx = multiprocessing.get_context("fork")
except ValueError:
ctx = None
if ctx is not None:
# 新进程执行
logger.info(f"在新进程中整理文件 {fileitem.path}")
proc = ctx.Process(target=_worker_action)
# 加锁以避免与当前进程内其他并发调用发生竞争
with lock:
proc.start()
proc.join()
if proc.exitcode == 0:
# 成功
item = target_oper.get_item(target_file)
if item:
return item, ""
return None, f"{target_storage}{target_file} 获取失败"
else:
return None, f"{fileitem.path} {transfer_type} 失败"
else:
# 当前进程执行
success, errmsg = _do_transfer()
if success:
item = target_oper.get_item(target_file)
if item:
return item, ""
return None, f"{target_storage}{target_file} 获取失败"
return None, errmsg
def __transfer_other_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,