feat:文件整理支持操作类入参

This commit is contained in:
jxxghp
2025-05-01 20:56:17 +08:00
parent e13e1c9ca3
commit ca1f3ac377
2 changed files with 117 additions and 61 deletions

View File

@@ -3,6 +3,7 @@ import gc
import pickle
import traceback
from abc import ABCMeta
from collections.abc import Callable
from pathlib import Path
from typing import Optional, Any, Tuple, List, Set, Union, Dict
@@ -446,7 +447,8 @@ class ChainBase(metaclass=ABCMeta):
target_storage: Optional[str] = None, target_path: Path = None,
transfer_type: Optional[str] = None, scrape: bool = None,
library_type_folder: bool = None, library_category_folder: bool = None,
episodes_info: List[TmdbEpisode] = None) -> Optional[TransferInfo]:
episodes_info: List[TmdbEpisode] = None,
source_oper: Callable = None, target_oper: Callable = None) -> Optional[TransferInfo]:
"""
文件转移
:param fileitem: 文件信息
@@ -460,6 +462,8 @@ class ChainBase(metaclass=ABCMeta):
:param library_type_folder: 是否按类型创建目录
:param library_category_folder: 是否按类别创建目录
:param episodes_info: 当前季的全部集信息
:param source_oper: 源存储操作类
:param target_oper: 目标存储操作类
:return: {path, target_path, message}
"""
return self.run_module("transfer",
@@ -469,7 +473,8 @@ class ChainBase(metaclass=ABCMeta):
transfer_type=transfer_type, scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
episodes_info=episodes_info)
episodes_info=episodes_info,
source_oper=source_oper, target_oper=target_oper)
def transfer_completed(self, hashs: str, downloader: Optional[str] = None) -> None:
"""

View File

@@ -1,7 +1,7 @@
import re
from pathlib import Path
from threading import Lock
from typing import Optional, List, Tuple, Union, Dict
from typing import Optional, List, Tuple, Union, Dict, Callable
from jinja2 import Template
@@ -330,7 +330,8 @@ class FileManagerModule(_ModuleBase):
target_storage: Optional[str] = None, target_path: Path = None,
transfer_type: Optional[str] = None, scrape: Optional[bool] = None,
library_type_folder: Optional[bool] = None, library_category_folder: Optional[bool] = None,
episodes_info: List[TmdbEpisode] = None) -> TransferInfo:
episodes_info: List[TmdbEpisode] = None,
source_oper: Callable = None, target_oper: Callable = None) -> TransferInfo:
"""
文件整理
:param fileitem: 文件信息
@@ -344,6 +345,8 @@ class FileManagerModule(_ModuleBase):
:param library_type_folder: 是否按媒体类型创建目录
:param library_category_folder: 是否按媒体类别创建目录
:param episodes_info: 当前季的全部集信息
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:return: {path, target_path, message}
"""
# 检查目录路径
@@ -411,7 +414,9 @@ class FileManagerModule(_ModuleBase):
need_rename=need_rename,
need_notify=need_notify,
overwrite_mode=overwrite_mode,
episodes_info=episodes_info)
episodes_info=episodes_info,
source_oper=source_oper,
target_oper=target_oper)
def __get_storage_oper(self, _storage: str, _func: Optional[str] = None) -> Optional[StorageBase]:
"""
@@ -430,12 +435,17 @@ class FileManagerModule(_ModuleBase):
"""
pass
def __transfer_command(self, fileitem: FileItem, target_storage: str,
target_file: Path, transfer_type: str) -> Tuple[Optional[FileItem], str]:
@staticmethod
def __transfer_command(fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str,
) -> Tuple[Optional[FileItem], str]:
"""
处理单个文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标文件路径
:param transfer_type: 整理方式
"""
@@ -459,13 +469,6 @@ class FileManagerModule(_ModuleBase):
and fileitem.storage != "local" and target_storage != "local"):
return None, f"不支持 {fileitem.storage}{target_storage} 的文件整理"
# 源操作对象
source_oper: StorageBase = self.__get_storage_oper(fileitem.storage)
# 目的操作对象
target_oper: StorageBase = self.__get_storage_oper(target_storage)
if not source_oper or not target_oper:
return None, f"不支持的存储类型:{fileitem.storage}{target_storage}"
# 加锁
with lock:
if fileitem.storage == "local" and target_storage == "local":
@@ -568,18 +571,23 @@ class FileManagerModule(_ModuleBase):
return None, "未知错误"
def __transfer_other_files(self, fileitem: FileItem, target_storage: str, target_file: Path,
transfer_type: str) -> Tuple[bool, str]:
def __transfer_other_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
"""
根据文件名整理其他相关文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
"""
# 整理字幕
state, errmsg = self.__transfer_subtitles(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
if not state:
@@ -587,17 +595,22 @@ class FileManagerModule(_ModuleBase):
# 整理音轨文件
state, errmsg = self.__transfer_audio_track_files(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
return state, errmsg
def __transfer_subtitles(self, fileitem: FileItem, target_storage: str, target_file: Path,
transfer_type: str) -> Tuple[bool, str]:
def __transfer_subtitles(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
"""
根据文件名整理对应字幕文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
"""
@@ -617,17 +630,12 @@ class FileManagerModule(_ModuleBase):
# 比对文件名并整理字幕
org_path = Path(fileitem.path)
# 列出所有字幕文件
storage_oper = self.__get_storage_oper(fileitem.storage)
if not storage_oper:
logger.error(f"不支持 {fileitem.storage} 的文件整理")
return False, f"不支持的文件存储:{fileitem.storage}"
# 查找上级文件项
parent_item: FileItem = storage_oper.get_parent(fileitem)
parent_item: FileItem = source_oper.get_parent(fileitem)
if not parent_item:
return False, f"{org_path} 上级目录获取失败"
# 字幕文件列表
file_list: List[FileItem] = storage_oper.list(parent_item) or []
file_list: List[FileItem] = source_oper.list(parent_item) or []
file_list = [f for f in file_list if f.type == "file" and f.extension
and f".{f.extension.lower()}" in settings.RMT_SUBEXT]
if len(file_list) == 0:
@@ -677,9 +685,9 @@ class FileManagerModule(_ModuleBase):
}
new_sub_tag_list = [
(".default" + new_file_type if (
(settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn") or
(settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw") or
(settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")
(settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn") or
(settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw") or
(settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")
) else new_file_type) if t == 0 else "%s%s(%s)" % (new_file_type,
new_sub_tag_dict.get(
new_file_type, ""
@@ -693,6 +701,8 @@ class FileManagerModule(_ModuleBase):
logger.debug(f"正在处理字幕:{sub_item.name}")
new_item, errmsg = self.__transfer_command(fileitem=sub_item,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=new_file,
transfer_type=transfer_type)
if new_item:
@@ -705,26 +715,24 @@ class FileManagerModule(_ModuleBase):
logger.info(f"字幕 {new_file} 出错了,原因: {str(error)}")
return True, ""
def __transfer_audio_track_files(self, fileitem: FileItem, target_storage: str, target_file: Path,
transfer_type: str) -> Tuple[bool, str]:
def __transfer_audio_track_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
"""
根据文件名整理对应音轨文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
"""
org_path = Path(fileitem.path)
# 列出所有音轨文件
storage_oper = self.__get_storage_oper(fileitem.storage)
if not storage_oper:
logger.error(f"不支持 {fileitem.storage} 的文件整理")
return False, f"不支持的文件存储:{fileitem.storage}"
# 查找上级文件项
parent_item: FileItem = storage_oper.get_parent(fileitem)
parent_item: FileItem = source_oper.get_parent(fileitem)
if not parent_item:
return False, f"{org_path} 上级目录获取失败"
file_list: List[FileItem] = storage_oper.list(parent_item)
file_list: List[FileItem] = source_oper.list(parent_item)
# 匹配音轨文件
pending_file_list: List[FileItem] = [file for file in file_list
if Path(file.name).stem == org_path.stem
@@ -740,6 +748,8 @@ class FileManagerModule(_ModuleBase):
logger.info(f"正在整理音轨文件:{track_file}{new_track_file}")
new_item, errmsg = self.__transfer_command(fileitem=track_file,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=new_track_file,
transfer_type=transfer_type)
if new_item:
@@ -750,21 +760,19 @@ class FileManagerModule(_ModuleBase):
logger.error(f"音轨文件 {org_path.name} 整理失败:{str(error)}")
return True, ""
def __transfer_dir(self, fileitem: FileItem, mediainfo: MediaInfo, transfer_type: str,
target_storage: str, target_path: Path) -> Tuple[Optional[FileItem], str]:
def __transfer_dir(self, fileitem: FileItem, mediainfo: MediaInfo,
source_oper: StorageBase, target_oper: StorageBase,
transfer_type: str, target_storage: str, target_path: Path) -> Tuple[Optional[FileItem], str]:
"""
整理整个文件夹
:param fileitem: 源文件
:param mediainfo: 媒体信息
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param transfer_type: 整理方式
:param target_storage: 目标存储
:param target_path: 目标路径
"""
# 获取目标目录
target_oper: StorageBase = self.__get_storage_oper(target_storage)
if not target_oper:
return None, f"不支持的文件存储:{target_storage}"
logger.info(f"正在整理目录:{fileitem.path}{target_path}")
target_item = target_oper.get_folder(target_path)
if not target_item:
@@ -788,6 +796,8 @@ class FileManagerModule(_ModuleBase):
# 处理所有文件
state, errmsg = self.__transfer_dir_files(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_path=target_path,
transfer_type=transfer_type)
if state:
@@ -795,29 +805,29 @@ class FileManagerModule(_ModuleBase):
else:
return None, errmsg
def __transfer_dir_files(self, fileitem: FileItem, transfer_type: str,
target_storage: str, target_path: Path) -> Tuple[bool, str]:
def __transfer_dir_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
transfer_type: str, target_path: Path) -> Tuple[bool, str]:
"""
按目录结构整理目录下所有文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_path: 目标路径
:param transfer_type: 整理方式
"""
# 列出所有文件
storage_oper = self.__get_storage_oper(fileitem.storage)
if not storage_oper:
logger.error(f"不支持 {fileitem.storage} 的文件整理")
return False, f"不支持的文件存储:{fileitem.storage}"
file_list: List[FileItem] = storage_oper.list(fileitem)
file_list: List[FileItem] = source_oper.list(fileitem)
# 整理文件
for item in file_list:
if item.type == "dir":
# 递归整理目录
new_path = target_path / item.name
state, errmsg = self.__transfer_dir_files(fileitem=item,
transfer_type=transfer_type,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
transfer_type=transfer_type,
target_path=new_path)
if not state:
return False, errmsg
@@ -826,6 +836,8 @@ class FileManagerModule(_ModuleBase):
new_file = target_path / item.name
new_item, errmsg = self.__transfer_command(fileitem=item,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=new_file,
transfer_type=transfer_type)
if not new_item:
@@ -833,16 +845,23 @@ class FileManagerModule(_ModuleBase):
# 返回成功
return True, ""
def __transfer_file(self, fileitem: FileItem, mediainfo: MediaInfo, target_storage: str, target_file: Path,
transfer_type: str, over_flag: Optional[bool] = False) -> Tuple[Optional[FileItem], str]:
def __transfer_file(self, fileitem: FileItem, mediainfo: MediaInfo,
source_oper: StorageBase, target_oper: StorageBase,
target_storage: str, target_file: Path,
transfer_type: str, over_flag: Optional[bool] = False) -> Tuple[
Optional[FileItem], str]:
"""
整理一个文件,同时处理其他相关文件
:param fileitem: 原文件
:param mediainfo: 媒体信息
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_storage: 目标存储
:param target_file: 新文件
:param transfer_type: 整理方式
:param over_flag: 是否覆盖为True时会先删除再整理
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
"""
logger.info(f"正在整理文件:【{fileitem.storage}{fileitem.path} 到 【{target_storage}{target_file}"
f"操作类型:{transfer_type}")
@@ -874,12 +893,16 @@ class FileManagerModule(_ModuleBase):
target_file.unlink()
new_item, errmsg = self.__transfer_command(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
if new_item:
# 处理其他相关文件
self.__transfer_other_files(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
return new_item, errmsg
@@ -941,6 +964,8 @@ class FileManagerModule(_ModuleBase):
need_notify: Optional[bool] = True,
overwrite_mode: Optional[str] = None,
episodes_info: List[TmdbEpisode] = None,
source_oper: StorageBase = None,
target_oper: StorageBase = None
) -> TransferInfo:
"""
识别并整理一个文件或者一个目录下的所有文件
@@ -955,6 +980,8 @@ class FileManagerModule(_ModuleBase):
:param need_notify: 是否需要通知
:param overwrite_mode: 覆盖模式
:param episodes_info: 当前季的全部集信息
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:return: TransferInfo、错误信息
"""
@@ -977,6 +1004,8 @@ class FileManagerModule(_ModuleBase):
# 整理目录
new_diritem, errmsg = self.__transfer_dir(fileitem=fileitem,
mediainfo=mediainfo,
source_oper=source_oper,
target_oper=target_oper,
target_storage=target_storage,
target_path=new_path,
transfer_type=transfer_type)
@@ -1040,8 +1069,28 @@ class FileManagerModule(_ModuleBase):
# 判断是否要覆盖
overflag = False
# 源操作对象
if not source_oper:
source_oper = self.__get_storage_oper(fileitem.storage)
if not source_oper:
return TransferInfo(success=False,
message=f"不支持的存储类型:{fileitem.storage}",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify
)
# 目的操作对象
target_oper: StorageBase = self.__get_storage_oper(target_storage)
if not target_oper:
target_oper = self.__get_storage_oper(target_storage)
if not target_oper:
return TransferInfo(success=False,
message=f"不支持的存储类型:{target_storage}",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
# 计算重命名中的文件夹层级
rename_format_level = len(rename_format.split("/")) - 1
folder_path = new_file.parents[rename_format_level - 1]
@@ -1102,14 +1151,16 @@ class FileManagerModule(_ModuleBase):
if overwrite_mode == 'latest':
# 文件不存在,但仅保留最新版本
logger.info(f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_storage, new_file)
self.__delete_version_files(target_oper, new_file)
# 整理文件
new_item, err_msg = self.__transfer_file(fileitem=fileitem,
mediainfo=mediainfo,
target_storage=target_storage,
target_file=new_file,
transfer_type=transfer_type,
over_flag=overflag)
over_flag=overflag,
source_oper=source_oper,
target_oper=target_oper)
if not new_item:
logger.error(f"文件 {fileitem.path} 整理失败:{err_msg}")
return TransferInfo(success=False,
@@ -1351,14 +1402,14 @@ class FileManagerModule(_ModuleBase):
logger.info(f"{mediainfo.title_year} 在本地文件系统中找到了这些季集:{seasons}")
return ExistMediaInfo(type=MediaType.TV, seasons=seasons)
def __delete_version_files(self, target_storage: str, path: Path) -> bool:
@staticmethod
def __delete_version_files(storage_oper: StorageBase, path: Path) -> bool:
"""
删除目录下的所有版本文件
:param target_storage: 存储类型
:param storage_oper: 存储操作对象
:param path: 目录路径
"""
# 存储
storage_oper = self.__get_storage_oper(target_storage)
if not storage_oper:
return False
# 识别文件中的季集信息