Files
MoviePilot/app/chain/transfer.py
2024-12-23 22:02:45 +08:00

878 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import queue
import re
import threading
from pathlib import Path
from queue import Queue
from typing import List, Optional, Tuple, Union, Dict, Callable, Any
from app.chain import ChainBase
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.tmdb import TmdbChain
from app.core.config import settings, global_vars
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.downloadhistory import DownloadHistory
from app.db.models.transferhistory import TransferHistory
from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
downloader_lock = threading.Lock()
tree_lock = threading.Lock()
class TransferChain(ChainBase, metaclass=Singleton):
"""
文件整理处理链
"""
# 可处理的文件后缀
all_exts = settings.RMT_MEDIAEXT
# 待整理任务队列
_queue = Queue()
# 整理中的目录树
_job_tree: Dict[str, Dict[str, Any]] = {}
# 文件整理线程
_transfer_thread = None
# 文件整理检查间隔(秒)
_transfer_interval = 5
def __init__(self):
super().__init__()
self.downloadhis = DownloadHistoryOper()
self.transferhis = TransferHistoryOper()
self.progress = ProgressHelper()
self.mediachain = MediaChain()
self.tmdbchain = TmdbChain()
self.storagechain = StorageChain()
self.systemconfig = SystemConfigOper()
self.directoryhelper = DirectoryHelper()
# 启动整理任务
self.__init()
def __init(self):
"""
初始化
"""
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
def __default_callback(self, task: TransferTask, transferinfo: TransferInfo, /):
"""
整理完成后处理
"""
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo.success:
# 转移失败
logger.warn(f"{task.file_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
download_hash=task.download_hash,
meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 发送失败消息
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=task.mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 转移成功
logger.info(f"{task.file_path.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
download_hash=task.download_hash,
meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 整理完成事件
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
# TODO 整理完成一个媒体项时
if True:
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 下载器hash
if task.download_hash:
if self.remove_torrents(task.download_hash, downloader=task.downloader):
logger.info(f"移动模式删除种子成功:{task.download_hash} ")
# 删除残留目录
if task.fileitem:
self.storagechain.delete_media_file(task.fileitem, delete_self=False)
# TODO 发送通知
"""
if transferinfo.need_notify:
se_str = None
if task.media.type == MediaType.TV:
se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}"
self.send_transfer_message(meta=transfer_meta,
mediainfo=media,
transferinfo=transfer_info,
season_episode=se_str)
# TODO 刮削事件
if scrape or transfer_info.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': transfer_meta,
'mediainfo': media,
'fileitem': transfer_info.target_diritem
})
"""
def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None):
"""
添加到待整理队列
:param task: 任务信息
:param callback: 回调函数
"""
if not task:
return
# TODO 维护目录树
# 添加到队列
self._queue.put(TransferQueue(
task=task,
callback=callback or self.__default_callback
))
def __start_transfer(self):
"""
处理队列
"""
# 任务总数
total_num = 0
# 已处理总数
processed_num = 0
# 失败数量
fail_num = 0
# 跳过数量
skip_num = 0
# 队列开始标识
__queue_start = True
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
if item:
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
# TODO 计算总数
total_num = 0
self.progress.update(value=0,
text=f"开始整理队列处理,共 {total_num} 个文件或子目录 ...",
key=ProgressKey.FileTransfer)
# 队列已开始
__queue_start = False
# TODO 重新计算总数
total_num = 0
# 更新进度
__process_msg = f"正在整理 {processed_num + 1}/{total_num}{item.task.fileitem.name} ..."
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理
self.__handle_transfer(task=item.task, callback=item.callback)
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个,跳过 {skip_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
# 重置计数
total_num = 0
processed_num = 0
fail_num = 0
skip_num = 0
# 标记为新队列
__queue_start = True
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None):
"""
处理整理任务
"""
if not task:
return
# 执行整理
transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem,
meta=task.meta,
mediainfo=task.mediainfo,
target_directory=task.target_directory,
target_storage=task.target_storage,
target_path=task.target_path,
transfer_type=task.transfer_type,
episodes_info=task.episodes_info,
scrape=task.scrape,
library_type_folder=task.library_type_folder,
library_category_folder=task.library_category_folder)
if not transferinfo:
logger.error("文件整理模块运行失败")
return
# 回调,位置传参:任务、整理结果
if callback:
callback(task, transferinfo)
def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]:
"""
获取重命名后的名称
:param meta: 元数据
:param mediainfo: 媒体信息
:return: 重命名后的名称(含目录)
"""
return self.run_module("recommend_name", meta=meta, mediainfo=mediainfo)
def process(self) -> bool:
"""
获取下载器中的种子列表,并执行整理
"""
# 全局锁,避免重复处理
with downloader_lock:
# 获取下载器监控目录
download_dirs = self.directoryhelper.get_download_dirs()
# 如果没有下载器监控的目录则不处理
if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local"
for dir_info in download_dirs):
return True
logger.info("开始整理下载器中已经完成下载的文件 ...")
# 从下载器获取种子列表
torrents: Optional[List[TransferTorrent]] = self.list_torrents(status=TorrentStatus.TRANSFER)
if not torrents:
logger.info("没有已完成下载但未整理的任务")
return False
logger.info(f"获取到 {len(torrents)} 个已完成的下载任务")
for torrent in torrents:
if global_vars.is_system_stopped:
break
# 文件路径
file_path = torrent.path
if not file_path.exists():
logger.warn(f"文件不存在:{file_path}")
continue
# 检查是否为下载器监控目录中的文件
is_downloader_monitor = False
for dir_info in download_dirs:
if dir_info.monitor_type != "downloader":
continue
if not dir_info.download_path:
continue
if file_path.is_relative_to(Path(dir_info.download_path)):
is_downloader_monitor = True
break
if not is_downloader_monitor:
logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理")
continue
# 查询下载记录识别情况
downloadhis: DownloadHistory = self.downloadhis.get_by_hash(torrent.hash)
if downloadhis:
# 类型
try:
mtype = MediaType(downloadhis.type)
except ValueError:
mtype = MediaType.TV
# 按TMDBID识别
mediainfo = self.recognize_media(mtype=mtype,
tmdbid=downloadhis.tmdbid,
doubanid=downloadhis.doubanid)
if mediainfo:
# 补充图片
self.obtain_images(mediainfo)
# 更新自定义媒体类别
if downloadhis.media_category:
mediainfo.category = downloadhis.media_category
else:
# 非MoviePilot下载的任务按文件识别
mediainfo = None
# 执行整理,匹配源目录
state, errmsg = self.do_transfer(
fileitem=FileItem(
storage="local",
path=str(file_path),
type="dir" if not file_path.is_file() else "file",
name=file_path.name,
size=file_path.stat().st_size,
extension=file_path.suffix.lstrip('.'),
),
mediainfo=mediainfo,
downloader=torrent.downloader,
download_hash=torrent.hash,
src_match=True
)
# 设置下载任务状态
if state:
self.transfer_completed(hashs=torrent.hash)
# 结束
logger.info("所有下载器中下载完成的文件已整理完成")
return True
def __get_trans_fileitems(self, fileitem: FileItem) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
"""
def __is_bluray_dir(_fileitem: FileItem) -> bool:
"""
判断是不是蓝光目录
"""
subs = self.storagechain.list_files(_fileitem)
if subs:
for sub in subs:
if sub.type == "dir" and sub.name in ["BDMV", "CERTIFICATE"]:
return True
return False
def __is_bluray_sub(_path: str) -> bool:
"""
判断是否蓝光原盘目录内的子目录或文件
"""
return True if re.search(r"BDMV[/\\]STREAM", _path, re.IGNORECASE) else False
def __get_bluray_dir(_storage: str, _path: Path) -> Optional[FileItem]:
"""
获取蓝光原盘BDMV目录的上级目录
"""
for p in _path.parents:
if p.name == "BDMV":
return self.storagechain.get_file_item(storage=_storage, path=p.parent)
return None
if not self.storagechain.get_item(fileitem):
logger.warn(f"目录或文件不存在:{fileitem.path}")
return []
# 蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
return [(__get_bluray_dir(Path(fileitem.path)), True)]
# 蓝光原盘根目录
if __is_bluray_dir(fileitem):
return [(fileitem, True)]
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
# 需要整理的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
for sub_dir in self.storagechain.list_files(fileitem):
if sub_dir.type == "dir":
if __is_bluray_dir(sub_dir):
trans_items.append((sub_dir, True))
else:
trans_items.append((sub_dir, False))
if not trans_items:
# 没有有效子目录,直接整理当前目录
trans_items.append((fileitem, False))
else:
# 有子目录时,把当前目录的文件添加到整理任务中
sub_items = self.storagechain.list_files(fileitem)
if sub_items:
sub_files = [f for f in sub_items if f.type == "file"]
if sub_files:
trans_items.extend((sub_files, False))
return trans_items
def do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
target_directory: TransferDirectoryConf = None,
target_storage: str = None, target_path: Path = None,
transfer_type: str = None, scrape: bool = None,
library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
:param meta: 元数据
:param mediainfo: 媒体信息
:param target_directory: 目标目录配置
:param target_storage: 目标存储器
:param target_path: 目标路径
:param transfer_type: 整理类型
:param scrape: 是否刮削元数据
:param library_type_folder: 媒体库类型子目录
:param library_category_folder: 媒体库类别子目录
:param season: 季
:param epformat: 剧集格式
:param min_filesize: 最小文件大小(MB)
:param downloader: 下载器
:param download_hash: 下载记录hash
:param force: 是否强制整理
:param src_match: 是否源目录匹配
返回:成功标识,错误信息
"""
# 自定义格式
formaterHandler = FormatParser(eformat=epformat.format,
details=epformat.detail,
part=epformat.part,
offset=epformat.offset) if epformat else None
# 整理屏蔽词
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
# 待整理文件列表
file_items: List[Tuple[FileItem, bool]] = []
# 汇总错误信息
err_msgs: List[str] = []
# 获取待整理路径清单
trans_items = self.__get_trans_fileitems(fileitem)
if not trans_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 处理所有待整理目录或文件
for trans_item, bluray_dir in trans_items:
# 如果是目录且不是⼀蓝光原盘,获取所有文件并整理
if trans_item.type == "dir" and not bluray_dir:
# 遍历获取下载目录所有文件(递归)
if files := self.storagechain.list_files(trans_item, recursion=True):
file_items.extend([(file, False) for file in files])
else:
file_items.append((trans_item, bluray_dir))
if formaterHandler:
# 有集自定义格式,过滤文件
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
# 过滤后缀和大小
file_items = [f for f in file_items
if f[0].extension and (f".{f[0].extension.lower()}" in self.all_exts
and (not min_filesize or f[0].size > min_filesize * 1024 * 1024))]
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 更新总文件数
total_num = len(file_items)
logger.info(f"正在整理 {total_num} 个文件...")
# 整理所有文件
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
or file_item.path.find('/#recycle/') != -1 \
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
continue
# 整理屏蔽词不处理
is_blocked = False
if transfer_exclude_words:
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, file_item.path, re.IGNORECASE):
logger.info(f"{file_item.path} 命中整理屏蔽词 {keyword},不处理")
is_blocked = True
break
if is_blocked:
err_msgs.append(f"{file_item.name} 命中整理屏蔽词")
continue
# 整理成功的不再处理
if not force:
transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage)
if transferd and transferd.status:
logger.info(f"{file_item.path} 已成功整理过,如需重新处理,请删除历史记录。")
continue
if not meta:
# 文件元数据
file_meta = MetaInfoPath(file_path)
else:
file_meta = meta
# 合并季
if season is not None:
file_meta.begin_season = season
if not file_meta:
logger.error(f"{file_path} 无法识别有效信息")
err_msgs.append(f"{file_path} 无法识别有效信息")
continue
# 自定义识别
if formaterHandler:
# 开始集、结束集、PART
begin_ep, end_ep, part = formaterHandler.split_episode(file_name=file_path.name, file_meta=file_meta)
if begin_ep is not None:
file_meta.begin_episode = begin_ep
file_meta.part = part
if end_ep is not None:
file_meta.end_episode = end_ep
# 根据父路径获取下载历史
download_history = None
if bluray_dir:
# 蓝光原盘,按目录名查询
download_history = self.downloadhis.get_by_path(str(file_path))
else:
# 按文件全路径查询
download_file = self.downloadhis.get_file_by_fullpath(str(file_path))
if download_file:
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
if download_history and (not downloader or not download_hash):
downloader = download_history.downloader
download_hash = download_history.download_hash
if not mediainfo:
# 识别媒体信息
if download_history and (download_history.tmdbid or download_history.doubanid):
# 下载记录中已存在识别信息
file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
mediainfo.category = download_history.media_category
else:
# 识别媒体信息
file_mediainfo = self.mediachain.recognize_by_meta(file_meta)
# 更新媒体图片
self.obtain_images(mediainfo=file_mediainfo)
else:
file_mediainfo = mediainfo
if not file_mediainfo:
logger.warn(f'{file_path} 未识别到媒体信息')
# 新增整理失败历史记录
his = self.transferhis.add_fail(
fileitem=file_item,
mode=transfer_type,
meta=file_meta,
downloader=downloader,
download_hash=download_hash
)
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{file_path.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。",
link=settings.MP_DOMAIN('#/history')
))
continue
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=file_mediainfo.tmdb_id,
mtype=file_mediainfo.type.value)
if transfer_history:
file_mediainfo.title = transfer_history.title
logger.info(f"{file_path.name} 识别为:{file_mediainfo.type.value} {file_mediainfo.title_year}")
# 获取集数据
if file_mediainfo.type == MediaType.TV:
if file_meta.begin_season is None:
file_meta.begin_season = 1
file_mediainfo.season = file_mediainfo.season or file_meta.begin_season
episodes_info = self.tmdbchain.tmdb_episodes(
tmdbid=file_mediainfo.tmdb_id,
season=file_mediainfo.season
)
else:
episodes_info = None
# 查询整理目标目录
dir_info = None
if not target_directory:
if src_match:
# 按源目录匹配,以便找到更合适的目录配置
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
storage=file_item.storage,
src_path=file_path,
target_storage=target_storage)
elif target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
dest_path=target_path,
target_storage=target_storage)
else:
# 未指定目标路径,根据媒体信息获取目标目录
dir_info = self.directoryhelper.get_dir(file_mediainfo,
storage=file_item.storage,
target_storage=target_storage)
# 后台整理
self.__handle_transfer(task=TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
))
return True, "\n".join(err_msgs)
def remote_transfer(self, arg_str: str, channel: MessageChannel,
userid: Union[str, int] = None, source: str = None):
"""
远程重新整理,参数 历史记录ID TMDBID|类型
"""
def args_error():
self.post_message(Notification(channel=channel, source=source,
title="请输入正确的命令格式:/redo [id] [tmdbid/豆瓣id]|[类型]"
"[id]历史记录编号", userid=userid))
if not arg_str:
args_error()
return
arg_strs = str(arg_str).split()
if len(arg_strs) != 2:
args_error()
return
# 历史记录ID
logid = arg_strs[0]
if not logid.isdigit():
args_error()
return
# TMDBID/豆瓣ID
id_strs = arg_strs[1].split('|')
media_id = id_strs[0]
if not logid.isdigit():
args_error()
return
# 类型
type_str = id_strs[1] if len(id_strs) > 1 else None
if not type_str or type_str not in [MediaType.MOVIE.value, MediaType.TV.value]:
args_error()
return
state, errmsg = self.__re_transfer(logid=int(logid),
mtype=MediaType(type_str),
mediaid=media_id)
if not state:
self.post_message(Notification(channel=channel, title="手动整理失败", source=source,
text=errmsg, userid=userid, link=settings.MP_DOMAIN('#/history')))
return
def __re_transfer(self, logid: int, mtype: MediaType = None,
mediaid: str = None) -> Tuple[bool, str]:
"""
根据历史记录,重新识别整理,只支持简单条件
:param logid: 历史记录ID
:param mtype: 媒体类型
:param mediaid: TMDB ID/豆瓣ID
"""
# 查询历史记录
history: TransferHistory = self.transferhis.get(logid)
if not history:
logger.error(f"历史记录不存在ID{logid}")
return False, "历史记录不存在"
# 按源目录路径重新整理
src_path = Path(history.src)
if not src_path.exists():
return False, f"源目录不存在:{src_path}"
# 查询媒体信息
if mtype and mediaid:
mediainfo = self.recognize_media(mtype=mtype, tmdbid=int(mediaid) if str(mediaid).isdigit() else None,
doubanid=mediaid)
if mediainfo:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
else:
mediainfo = self.mediachain.recognize_by_path(str(src_path))
if not mediainfo:
return False, f"未识别到媒体信息,类型:{mtype.value}id{mediaid}"
# 重新执行整理
logger.info(f"{src_path.name} 识别为:{mediainfo.title_year}")
# 删除旧的已整理文件
if history.dest_fileitem:
# 解析目标文件对象
dest_fileitem = FileItem(**history.dest_fileitem)
self.storagechain.delete_file(dest_fileitem)
# 强制整理
if history.src_fileitem:
state, errmsg = self.do_transfer(fileitem=FileItem(**history.src_fileitem),
mediainfo=mediainfo,
download_hash=history.download_hash,
force=True)
if not state:
return False, errmsg
return True, ""
def manual_transfer(self,
fileitem: FileItem,
target_storage: str = None,
target_path: Path = None,
tmdbid: int = None,
doubanid: str = None,
mtype: MediaType = None,
season: int = None,
transfer_type: str = None,
epformat: EpisodeFormat = None,
min_filesize: int = 0,
scrape: bool = None,
library_type_folder: bool = None,
library_category_folder: bool = None,
force: bool = False) -> Tuple[bool, Union[str, list]]:
"""
手动整理,支持复杂条件,带进度显示
:param fileitem: 文件项
:param target_storage: 目标存储
:param target_path: 目标路径
:param tmdbid: TMDB ID
:param doubanid: 豆瓣ID
:param mtype: 媒体类型
:param season: 季度
:param transfer_type: 整理类型
:param epformat: 剧集格式
:param min_filesize: 最小文件大小(MB)
:param scrape: 是否刮削元数据
:param library_type_folder: 是否按类型建立目录
:param library_category_folder: 是否按类别建立目录
:param force: 是否强制整理
"""
logger.info(f"手动整理:{fileitem.path} ...")
if tmdbid or doubanid:
# 有输入TMDBID时单个识别
# 识别媒体信息
mediainfo: MediaInfo = self.mediachain.recognize_media(tmdbid=tmdbid, doubanid=doubanid, mtype=mtype)
if not mediainfo:
return False, f"媒体信息识别失败tmdbid{tmdbid}doubanid{doubanid}type: {mtype.value}"
else:
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
self.progress.update(value=0,
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始整理
state, errmsg = self.do_transfer(
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
mediainfo=mediainfo,
transfer_type=transfer_type,
season=season,
epformat=epformat,
min_filesize=min_filesize,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
force=force,
)
if not state:
return False, errmsg
self.progress.end(ProgressKey.FileTransfer)
logger.info(f"{fileitem.path} 整理完成")
return True, ""
else:
# 没有输入TMDBID时按文件识别
state, errmsg = self.do_transfer(fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
season=season,
epformat=epformat,
min_filesize=min_filesize,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
force=force)
return state, errmsg
def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo,
transferinfo: TransferInfo, season_episode: str = None):
"""
发送入库成功的消息
"""
msg_title = f"{mediainfo.title_year} {meta.season_episode if not season_episode else season_episode} 已入库"
if mediainfo.vote_average:
msg_str = f"评分:{mediainfo.vote_average},类型:{mediainfo.type.value}"
else:
msg_str = f"类型:{mediainfo.type.value}"
if mediainfo.category:
msg_str = f"{msg_str},类别:{mediainfo.category}"
if meta.resource_term:
msg_str = f"{msg_str},质量:{meta.resource_term}"
msg_str = f"{msg_str},共{transferinfo.file_count}个文件," \
f"大小:{StringUtils.str_filesize(transferinfo.total_size)}"
if transferinfo.message:
msg_str = f"{msg_str},以下文件处理失败:\n{transferinfo.message}"
# 发送
self.post_message(Notification(
mtype=NotificationType.Organize,
title=msg_title, text=msg_str, image=mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')))