TODO 后台整理队列

This commit is contained in:
jxxghp
2024-12-23 21:46:59 +08:00
parent 10f8efc457
commit c4300332c9
4 changed files with 317 additions and 654 deletions

View File

@@ -84,6 +84,12 @@ class StorageChain(ChainBase):
"""
return self.run_module("rename_file", fileitem=fileitem, name=name)
def get_item(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
查询目录或文件
"""
return self.get_file_item(storage=fileitem.storage, path=Path(fileitem.path))
def get_file_item(self, storage: str, path: Path) -> Optional[schemas.FileItem]:
"""
根据路径获取文件项

View File

@@ -3,7 +3,7 @@ import re
import threading
from pathlib import Path
from queue import Queue
from typing import List, Optional, Tuple, Union, Dict, Callable
from typing import List, Optional, Tuple, Union, Dict, Callable, Any
from app.chain import ChainBase
from app.chain.media import MediaChain
@@ -12,7 +12,7 @@ from app.chain.tmdb import TmdbChain
from app.core.config import settings, global_vars
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath, MetaInfo
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.downloadhistory import DownloadHistory
from app.db.models.transferhistory import TransferHistory
@@ -28,9 +28,9 @@ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey,
SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
lock = threading.Lock()
downloader_lock = threading.Lock()
tree_lock = threading.Lock()
class TransferChain(ChainBase, metaclass=Singleton):
@@ -44,6 +44,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 待整理任务队列
_queue = Queue()
# 整理中的目录树
_job_tree: Dict[str, Dict[str, Any]] = {}
# 文件整理线程
_transfer_thread = None
@@ -72,6 +75,92 @@ class TransferChain(ChainBase, metaclass=Singleton):
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
def __default_callback(self, task: TransferTask, transferinfo: TransferInfo, /):
"""
整理完成后处理
"""
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo.success:
# 转移失败
logger.warn(f"{task.file_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
download_hash=task.download_hash,
meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 发送失败消息
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=task.mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 转移成功
logger.info(f"{task.file_path.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
downloader=task.downloader,
download_hash=task.download_hash,
meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 整理完成事件
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
# 整理完成一个媒体项时
if True:
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 下载器hash
if task.download_hash:
if self.remove_torrents(task.download_hash, downloader=task.downloader):
logger.info(f"移动模式删除种子成功:{task.download_hash} ")
# 删除残留目录
if task.fileitem:
self.storagechain.delete_media_file(task.fileitem, delete_self=False)
# TODO 发送通知
"""
if transferinfo.need_notify:
se_str = None
if task.media.type == MediaType.TV:
se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}"
self.send_transfer_message(meta=transfer_meta,
mediainfo=media,
transferinfo=transfer_info,
season_episode=se_str)
# TODO 刮削事件
if scrape or transfer_info.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': transfer_meta,
'mediainfo': media,
'fileitem': transfer_info.target_diritem
})
"""
def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None):
"""
添加到待整理队列
@@ -80,21 +169,70 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
if not task:
return
# TODO 维护目录树
# 添加到队列
self._queue.put(TransferQueue(
task=task,
callback=callback
callback=callback or self.__default_callback
))
def __start_transfer(self):
"""
处理队列
"""
# 任务总数
total_num = 0
# 已处理总数
processed_num = 0
# 失败数量
fail_num = 0
# 跳过数量
skip_num = 0
# 队列开始标识
__queue_start = True
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
if item:
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
# TODO 计算总数
total_num = 0
self.progress.update(value=0,
text=f"开始整理队列处理,共 {total_num} 个文件或子目录 ...",
key=ProgressKey.FileTransfer)
# 队列已开始
__queue_start = False
# TODO 重新计算总数
total_num = 0
# 更新进度
__process_msg = f"正在整理 {processed_num + 1}/{total_num}{item.task.fileitem.name} ..."
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理
self.__handle_transfer(task=item.task, callback=item.callback)
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个,跳过 {skip_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
# 重置计数
total_num = 0
processed_num = 0
fail_num = 0
skip_num = 0
# 标记为新队列
__queue_start = True
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
@@ -139,7 +277,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
# 全局锁,避免重复处理
with lock:
with downloader_lock:
# 获取下载器监控目录
download_dirs = self.directoryhelper.get_download_dirs()
# 如果没有下载器监控的目录则不处理
@@ -199,7 +337,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
mediainfo = None
# 执行整理,匹配源目录
state, errmsg = self.__do_transfer(
state, errmsg = self.do_transfer(
fileitem=FileItem(
storage="local",
path=str(file_path),
@@ -222,16 +360,85 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info("所有下载器中下载完成的文件已整理完成")
return True
def __do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
target_directory: TransferDirectoryConf = None,
target_storage: str = None, target_path: Path = None,
transfer_type: str = None, scrape: bool = None,
library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False,
background: bool = False) -> Tuple[bool, str]:
def __get_trans_fileitems(self, fileitem: FileItem) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
"""
def __is_bluray_dir(_fileitem: FileItem) -> bool:
"""
判断是不是蓝光目录
"""
subs = self.storagechain.list_files(_fileitem)
if subs:
for sub in subs:
if sub.type == "dir" and sub.name in ["BDMV", "CERTIFICATE"]:
return True
return False
def __is_bluray_sub(_path: str) -> bool:
"""
判断是否蓝光原盘目录内的子目录或文件
"""
return True if re.search(r"BDMV[/\\]STREAM", _path, re.IGNORECASE) else False
def __get_bluray_dir(_path: Path) -> Optional[Path]:
"""
获取蓝光原盘BDMV目录的上级目录
"""
for p in _path.parents:
if p.name == "BDMV":
return p.parent
return None
if not self.storagechain.get_item(fileitem):
logger.warn(f"目录或文件不存在:{fileitem.path}")
return []
# 蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
return [(__get_bluray_dir(Path(fileitem.path)), True)]
# 蓝光原盘根目录
if __is_bluray_dir(fileitem):
return [(fileitem, True)]
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
# 需要整理的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
for sub_dir in self.storagechain.list_files(fileitem):
if sub_dir.type == "dir":
if __is_bluray_dir(sub_dir):
trans_items.append((sub_dir, True))
else:
trans_items.append((sub_dir, False))
if not trans_items:
# 没有有效子目录,直接整理当前目录
trans_items.append((fileitem, False))
else:
# 有子目录时,把当前目录的文件添加到整理任务中
sub_items = self.storagechain.list_files(fileitem)
if sub_items:
sub_files = [f for f in sub_items if f.type == "file"]
if sub_files:
trans_items.extend((sub_files, False))
return trans_items
def do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
target_directory: TransferDirectoryConf = None,
target_storage: str = None, target_path: Path = None,
transfer_type: str = None, scrape: bool = None,
library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
@@ -251,16 +458,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param download_hash: 下载记录hash
:param force: 是否强制整理
:param src_match: 是否源目录匹配
:param background: 是否后台整理
返回:成功标识,错误信息
"""
def __callback(_task: TransferTask, _transferinfo: TransferInfo):
"""
TODO 整理完成回调
"""
pass
# 自定义格式
formaterHandler = FormatParser(eformat=epformat.format,
details=epformat.detail,
@@ -269,73 +469,36 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 整理屏蔽词
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
# 汇总季集清单
season_episodes: Dict[Tuple, List[int]] = {}
# 汇总媒体信息
medias: Dict[Tuple, MediaInfo] = {}
# 汇总整理信息
transfers: Dict[Tuple, TransferInfo] = {}
# 待整理文件列表
file_items: List[FileItem] = []
# 蓝光目录列表
bluray: List[FileItem] = []
file_items: List[Tuple[FileItem, bool]] = []
# 汇总错误信息
err_msgs: List[str] = []
# 已处理数量
processed_num = 0
# 失败数量
fail_num = 0
# 跳过数量
skip_num = 0
# 本次整理方式
current_transfer_type = transfer_type
# 是否全部成功
all_success = True
# 获取待整理路径清单
trans_items = self.__get_trans_fileitems(fileitem)
# 总文件数
total_num = len(trans_items)
self.progress.update(value=0,
text=f"开始 {fileitem.path},共 {total_num} 个文件或子目录 ...",
key=ProgressKey.FileTransfer)
if not trans_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 处理所有待整理目录或文件,默认一个整理路径或文件只有一个媒体信息
for trans_item in trans_items:
item_path = Path(trans_item.path)
# 是否蓝光路径
bluray_dir = trans_item.storage == "local" and SystemUtils.is_bluray_dir(item_path)
for trans_item, bluray_dir in trans_items:
# 如果是目录且不是⼀蓝光原盘,获取所有文件并整理
if trans_item.type == "dir" and not bluray_dir:
# 遍历获取下载目录所有文件(递归)
if files := self.storagechain.list_files(trans_item, recursion=True):
file_items.extend(files)
# 如果是蓝光目录,计算⼤⼩
elif bluray_dir:
bluray.append(trans_item)
# 单个文件
file_items.extend([(file, False) for file in files])
else:
file_items.append(trans_item)
file_items.append((trans_item, bluray_dir))
if formaterHandler:
# 有集自定义格式,过滤文件
file_items = [f for f in file_items if formaterHandler.match(f.name)]
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
# 过滤后缀和大小
file_items = [f for f in file_items
if f.extension and (f".{f.extension.lower()}" in self.all_exts
and (not min_filesize or f.size > min_filesize * 1024 * 1024))]
# BDMV 跳过过滤
file_items.extend(bluray)
if f[0].extension and (f".{f[0].extension.lower()}" in self.all_exts
and (not min_filesize or f[0].size > min_filesize * 1024 * 1024))]
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
@@ -345,7 +508,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.info(f"正在整理 {total_num} 个文件...")
# 整理所有文件
for file_item in file_items:
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
file_path = Path(file_item.path)
@@ -355,9 +518,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
# 计数
processed_num += 1
skip_num += 1
continue
# 整理屏蔽词不处理
@@ -372,10 +532,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
break
if is_blocked:
err_msgs.append(f"{file_item.name} 命中整理屏蔽词")
# 计数
processed_num += 1
skip_num += 1
all_success = False
continue
# 整理成功的不再处理
@@ -383,17 +539,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage)
if transferd and transferd.status:
logger.info(f"{file_item.path} 已成功整理过,如需重新处理,请删除历史记录。")
# 计数
processed_num += 1
skip_num += 1
all_success = False
continue
# 更新进度
self.progress.update(value=processed_num / total_num * 100,
text=f"正在整理 {processed_num + 1}/{total_num}{file_item.name} ...",
key=ProgressKey.FileTransfer)
if not meta:
# 文件元数据
file_meta = MetaInfoPath(file_path)
@@ -407,10 +554,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not file_meta:
logger.error(f"{file_path} 无法识别有效信息")
err_msgs.append(f"{file_path} 无法识别有效信息")
# 计数
processed_num += 1
fail_num += 1
all_success = False
continue
# 自定义识别
@@ -423,9 +566,38 @@ class TransferChain(ChainBase, metaclass=Singleton):
if end_ep is not None:
file_meta.end_episode = end_ep
# 根据父路径获取下载历史
download_history = None
if bluray_dir:
# 蓝光原盘,按目录名查询
download_history = self.downloadhis.get_by_path(str(file_path))
else:
# 按文件全路径查询
download_file = self.downloadhis.get_file_by_fullpath(str(file_path))
if download_file:
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
if download_history and (not downloader or not download_hash):
downloader = download_history.downloader
download_hash = download_history.download_hash
if not mediainfo:
# 识别媒体信息
file_mediainfo = self.mediachain.recognize_by_meta(file_meta)
if download_history and (download_history.tmdbid or download_history.doubanid):
# 下载记录中已存在识别信息
file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
mediainfo.category = download_history.media_category
else:
# 识别媒体信息
file_mediainfo = self.mediachain.recognize_by_meta(file_meta)
# 更新媒体图片
self.obtain_images(mediainfo=file_mediainfo)
else:
file_mediainfo = mediainfo
@@ -436,6 +608,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
fileitem=file_item,
mode=transfer_type,
meta=file_meta,
downloader=downloader,
download_hash=download_hash
)
self.post_message(Notification(
@@ -444,10 +617,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。",
link=settings.MP_DOMAIN('#/history')
))
# 计数
processed_num += 1
fail_num += 1
all_success = False
continue
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
@@ -471,12 +640,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
else:
episodes_info = None
# 获取下载hash
if not download_hash:
download_file = self.downloadhis.get_file_by_fullpath(file_item.path)
if download_file:
download_hash = download_file.download_hash
# 查询整理目标目录
dir_info = None
if not target_directory:
@@ -498,205 +661,24 @@ class TransferChain(ChainBase, metaclass=Singleton):
target_storage=target_storage)
# 后台整理
if background:
self.__handle_transfer(task=TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
), callback=__callback)
continue
# 实时整理
transferinfo: TransferInfo = self.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder)
if not transferinfo:
logger.error("文件整理模块运行失败")
return False, "文件整理模块运行失败"
if not transferinfo.success:
# 整理失败
logger.warn(f"{file_path.name} 入库失败:{transferinfo.message}")
err_msgs.append(f"{file_path.name} {transferinfo.message}")
# 新增整理失败历史记录
self.transferhis.add_fail(
fileitem=file_item,
mode=transfer_type,
download_hash=download_hash,
meta=file_meta,
mediainfo=file_mediainfo,
transferinfo=transferinfo
)
# 发送消息
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{file_mediainfo.title_year} {file_meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=file_mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
# 计数
processed_num += 1
fail_num += 1
all_success = False
continue
# 汇总信息
current_transfer_type = transferinfo.transfer_type
mkey = (file_mediainfo.tmdb_id, file_meta.begin_season)
if mkey not in medias:
# 新增信息
medias[mkey] = file_mediainfo
season_episodes[mkey] = file_meta.episode_list
transfers[mkey] = transferinfo
else:
# 合并季集清单
season_episodes[mkey] = list(set(season_episodes[mkey] + file_meta.episode_list))
# 合并整理数据
transfers[mkey].file_count += transferinfo.file_count
transfers[mkey].total_size += transferinfo.total_size
transfers[mkey].file_list.extend(transferinfo.file_list)
transfers[mkey].file_list_new.extend(transferinfo.file_list_new)
transfers[mkey].fail_list.extend(transferinfo.fail_list)
# 新增整理成功历史记录
self.transferhis.add_success(
self.__handle_transfer(task=TransferTask(
fileitem=file_item,
mode=transfer_type or transferinfo.transfer_type,
download_hash=download_hash,
meta=file_meta,
mediainfo=file_mediainfo,
transferinfo=transferinfo
)
# 整理完成事件
self.eventmanager.send_event(EventType.TransferComplete, {
'meta': file_meta,
'mediainfo': file_mediainfo,
'transferinfo': transferinfo,
'downloader': downloader,
'download_hash': download_hash,
})
# 更新进度
processed_num += 1
self.progress.update(value=processed_num / total_num * 100,
text=f"{file_path.name} 整理完成",
key=ProgressKey.FileTransfer)
# 目录或文件整理完成
self.progress.update(text=f"{fileitem.path} 整理完成,正在执行后续处理 ...",
key=ProgressKey.FileTransfer)
# 执行后续处理
for mkey, media in medias.items():
transfer_info = transfers[mkey]
transfer_meta = MetaInfo(transfer_info.target_diritem.name)
transfer_meta.begin_season = mkey[1]
# 发送通知
if transfer_info.need_notify:
se_str = None
if media.type == MediaType.TV:
se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}"
self.send_transfer_message(meta=transfer_meta,
mediainfo=media,
transferinfo=transfer_info,
season_episode=se_str)
# 刮削事件
if scrape or transfer_info.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': transfer_meta,
'mediainfo': media,
'fileitem': transfer_info.target_diritem
})
# 移动模式处理
if all_success and current_transfer_type in ["move"]:
# 下载器hash
if download_hash:
if self.remove_torrents(download_hash, downloader=downloader):
logger.info(f"移动模式删除种子成功:{download_hash} ")
# 删除残留目录
if fileitem:
self.storagechain.delete_media_file(fileitem, delete_self=False)
# 结束进度
logger.info(f"{fileitem.path} 整理完成,共 {total_num} 个文件,"
f"失败 {fail_num} 个,跳过 {skip_num}")
self.progress.update(value=100,
text=f"{fileitem.path} 整理完成,共 {total_num} 个文件,"
f"失败 {fail_num} 个,跳过 {skip_num}",
key=ProgressKey.FileTransfer)
# 结速进度
self.progress.end(ProgressKey.FileTransfer)
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
))
return True, "\n".join(err_msgs)
def __get_trans_fileitems(self, fileitem: FileItem):
"""
获取整理目录或文件列表
"""
file_path = Path(fileitem.path)
if fileitem.storage == "local" and not file_path.exists():
logger.warn(f"目录不存在:{fileitem.path}")
return []
# 单文件
if fileitem.type == "file":
return [fileitem]
# 蓝光原盘
if fileitem.storage == "local" and SystemUtils.is_bluray_dir(file_path):
return [fileitem]
# 需要整理的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
for sub_dir in self.storagechain.list_files(fileitem):
subfile_path = Path(sub_dir.path)
# 添加蓝光原盘
if sub_dir.storage == "local" \
and sub_dir.type == "dir" \
and SystemUtils.is_bluray_dir(subfile_path):
trans_items.append(sub_dir)
# 添加目录
elif sub_dir.type == "dir":
trans_items.append(sub_dir)
if not trans_items:
# 没有有效子目录,直接整理当前目录
trans_items.append(fileitem)
else:
# 有子目录时,把当前目录的文件添加到整理任务中
sub_items = self.storagechain.list_files(fileitem)
if sub_items:
sub_files = [f for f in sub_items if f.type == "file"]
if sub_files:
trans_items.extend(sub_files)
return trans_items
def remote_transfer(self, arg_str: str, channel: MessageChannel,
userid: Union[str, int] = None, source: str = None):
"""
@@ -778,10 +760,10 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 强制整理
if history.src_fileitem:
state, errmsg = self.__do_transfer(fileitem=FileItem(**history.src_fileitem),
mediainfo=mediainfo,
download_hash=history.download_hash,
force=True)
state, errmsg = self.do_transfer(fileitem=FileItem(**history.src_fileitem),
mediainfo=mediainfo,
download_hash=history.download_hash,
force=True)
if not state:
return False, errmsg
@@ -835,7 +817,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=f"开始整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始整理
state, errmsg = self.__do_transfer(
state, errmsg = self.do_transfer(
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
@@ -857,17 +839,17 @@ class TransferChain(ChainBase, metaclass=Singleton):
return True, ""
else:
# 没有输入TMDBID时按文件识别
state, errmsg = self.__do_transfer(fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
season=season,
epformat=epformat,
min_filesize=min_filesize,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
force=force)
state, errmsg = self.do_transfer(fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
season=season,
epformat=epformat,
min_filesize=min_filesize,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
force=force)
return state, errmsg
def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo,

View File

@@ -120,7 +120,7 @@ class TransferHistoryOper(DbOper):
def add_success(self, fileitem: FileItem, mode: str, meta: MetaBase,
mediainfo: MediaInfo, transferinfo: TransferInfo,
download_hash: str = None):
downloader: str = None, download_hash: str = None):
"""
新增转移成功历史记录
"""
@@ -143,13 +143,14 @@ class TransferHistoryOper(DbOper):
seasons=meta.season,
episodes=meta.episode,
image=mediainfo.get_poster_image(),
downloader=downloader,
download_hash=download_hash,
status=1,
files=transferinfo.file_list
)
def add_fail(self, fileitem: FileItem, mode: str, meta: MetaBase, mediainfo: MediaInfo = None,
transferinfo: TransferInfo = None, download_hash: str = None):
transferinfo: TransferInfo = None, downloader: str = None, download_hash: str = None):
"""
新增转移失败历史记录
"""
@@ -173,6 +174,7 @@ class TransferHistoryOper(DbOper):
seasons=meta.season,
episodes=meta.episode,
image=mediainfo.get_poster_image(),
downloader=downloader,
download_hash=download_hash,
status=0,
errmsg=transferinfo.message or '未知错误',
@@ -188,6 +190,7 @@ class TransferHistoryOper(DbOper):
mode=mode,
seasons=meta.season,
episodes=meta.episode,
downloader=downloader,
download_hash=download_hash,
status=0,
errmsg="未识别到媒体信息"

View File

@@ -1,6 +1,4 @@
import datetime
import platform
import re
import threading
import traceback
from pathlib import Path
@@ -17,19 +15,14 @@ from app.chain.storage import StorageChain
from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import EventManager
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import FileItem, TransferInfo, Notification, TransferTask
from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType
from app.schemas import FileItem, TransferTask
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
lock = Lock()
snapshot_lock = Lock()
@@ -76,12 +69,6 @@ class Monitor(metaclass=Singleton):
# 存储过照间隔(分钟)
_snapshot_interval = 5
# 消息汇总
_msg_medias = {}
# 消息汇总间隔(秒)
_msg_interval = 60
def __init__(self):
super().__init__()
self.chain = MonitorChain()
@@ -166,9 +153,6 @@ class Monitor(metaclass=Singleton):
'storage': mon_dir.storage,
'mon_path': mon_path
})
# 追加入库消息统一发送服务
self._scheduler.add_job(self.__send_msg, trigger='interval', seconds=15)
# 启动定时服务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
@@ -231,74 +215,8 @@ class Monitor(metaclass=Singleton):
"""
添加到整理队列
"""
def __callback(_task: TransferTask, _transferinfo: TransferInfo, /):
"""
整理完成后处理
"""
if not _transferinfo:
logger.error("文件转移模块运行失败")
return
if not _transferinfo.success:
# 转移失败
logger.warn(f"{_task.file_path.name} 入库失败:{_transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
fileitem=_task.fileitem,
mode=_transferinfo.transfer_type if _transferinfo else '',
download_hash=_task.download_hash,
meta=_task.meta,
mediainfo=_task.mediainfo,
transferinfo=_transferinfo
)
# 发送失败消息
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{_task.mediainfo.title_year} {_task.meta.season_episode} 入库失败!",
text=f"原因:{_transferinfo.message or '未知'}",
image=_task.mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
return
# 转移成功
logger.info(f"{_task.file_path.name} 入库成功:{_transferinfo.target_diritem.path}")
# 新增转移成功历史记录
self.transferhis.add_success(
fileitem=_task.fileitem,
mode=_transferinfo.transfer_type if _transferinfo else '',
download_hash=_task.download_hash,
meta=_task.meta,
mediainfo=_task.mediainfo,
transferinfo=_transferinfo
)
# 汇总刮削
if _transferinfo.need_scrape:
self.mediaChain.scrape_metadata(fileitem=_transferinfo.target_diritem,
meta=_task.meta,
mediainfo=_task.mediainfo)
# 广播事件
EventManager().send_event(EventType.TransferComplete, {
'fileitem': _task.fileitem,
'meta': _task.meta,
'mediainfo': _task.mediainfo,
'transferinfo': _transferinfo
})
# 发送消息汇总
if _transferinfo.need_notify:
self.__collect_msg_medias(mediainfo=_task.mediainfo, file_meta=_task.meta,
transferinfo=_transferinfo)
# 移动模式删除空目录
if _transferinfo.transfer_type in ["move"]:
self.storagechain.delete_media_file(_task.fileitem, delete_self=False)
# 加入整理队列
self.transferchain.put_to_queue(task=task, callback=__callback)
# 加入整理队列,使用默认的回调函数
self.transferchain.put_to_queue(task=task)
def __handle_file(self, storage: str, event_path: Path):
"""
@@ -306,271 +224,25 @@ class Monitor(metaclass=Singleton):
:param storage: 存储
:param event_path: 事件文件路径
"""
def __get_bluray_dir(_path: Path):
"""
获取BDMV目录的上级目录
"""
for p in _path.parents:
if p.name == "BDMV":
return p.parent
return None
# 全程加锁
with lock:
try:
# 回收站及隐藏的文件不处
if str(event_path).find('/@Recycle/') != -1 \
or str(event_path).find('/#recycle/') != -1 \
or str(event_path).find('/.') != -1 \
or str(event_path).find('/@eaDir') != -1:
logger.debug(f"{event_path} 是回收站或隐藏的文件")
return
# 不是媒体文件不处理
if event_path.suffix.lower() not in self.all_exts:
logger.debug(f"{event_path} 不是媒体文件")
return
# 整理屏蔽词不处理
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
if transfer_exclude_words:
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, str(event_path), re.IGNORECASE):
logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理")
return
# 判断是不是蓝光目录
bluray_flag = False
if re.search(r"BDMV[/\\]STREAM", str(event_path), re.IGNORECASE):
bluray_flag = True
# 截取BDMV前面的路径
event_path = __get_bluray_dir(event_path)
logger.info(f"{event_path} 是蓝光原盘目录,更正文件路径为:{event_path}")
# 查询历史记录,已转移的不处理
if self.transferhis.get_by_src(str(event_path), storage=storage):
logger.info(f"{event_path} 已经整理过了")
return
# 元数据
file_meta = MetaInfoPath(event_path)
if not file_meta.name:
logger.error(f"{event_path.name} 无法识别有效信息")
return
# 根据父路径获取下载历史
download_history = None
if bluray_flag:
# 蓝光原盘,按目录名查询
download_history = self.downloadhis.get_by_path(str(event_path))
else:
# 按文件全路径查询
download_file = self.downloadhis.get_file_by_fullpath(str(event_path))
if download_file:
download_history = self.downloadhis.get_by_hash(download_file.download_hash)
# 获取下载Hash
downloader, download_hash = None, None
if download_history:
downloader = download_history.downloader
download_hash = download_history.download_hash
# 识别媒体信息
if download_history and (download_history.tmdbid or download_history.doubanid):
# 下载记录中已存在识别信息
mediainfo: MediaInfo = self.mediaChain.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
mediainfo.category = download_history.media_category
else:
mediainfo: MediaInfo = self.mediaChain.recognize_by_meta(file_meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{file_meta.name}')
# 新增转移失败历史记录
his = self.transferhis.add_fail(
fileitem=FileItem(
storage=storage,
type="file",
path=str(event_path),
name=event_path.name,
basename=event_path.stem,
extension=event_path.suffix[1:],
),
mode='',
meta=file_meta,
download_hash=download_hash
)
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{event_path.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别转移。",
link=settings.MP_DOMAIN('#/history')
))
return
# 查询转移目的目录
dir_info = self.directoryhelper.get_dir(mediainfo, storage=storage, src_path=event_path)
if not dir_info:
logger.warn(f"{event_path.name} 未找到对应的目标目录")
return
# 查找这个文件项
file_item = self.storagechain.get_file_item(storage=storage, path=event_path)
if not file_item:
logger.warn(f"{event_path.name} 未找到对应的文件")
return
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history:
mediainfo.title = transfer_history.title
logger.info(f"{event_path.name} 识别为:{mediainfo.type.value} {mediainfo.title_year}")
# 更新媒体图片
self.chain.obtain_images(mediainfo=mediainfo)
# 获取集数据
if mediainfo.type == MediaType.TV:
episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id,
season=file_meta.begin_season or 1)
else:
episodes_info = None
# 进入队列
self.__transfer_queue(
TransferTask(
fileitem=file_item,
file_path=event_path,
meta=file_meta,
mediainfo=mediainfo,
target_directory=dir_info,
episodes_info=episodes_info,
downloader=downloader,
download_hash=download_hash
)
# 开始整
# TODO 缺少文件大小
self.transferchain.do_transfer(
fileitem=FileItem(
storage=storage,
path=str(event_path),
type="file",
name=event_path.name,
basename=event_path.stem,
extension=event_path.suffix[1:]
),
src_match=True
)
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def __collect_msg_medias(self, mediainfo: MediaInfo, file_meta: MetaInfoPath, transferinfo: TransferInfo):
"""
收集媒体处理完的消息
"""
media_list = self._msg_medias.get(mediainfo.title_year + " " + file_meta.season) or {}
if media_list:
media_files = media_list.get("files") or []
if media_files:
file_exists = False
for file in media_files:
if str(transferinfo.fileitem.path) == file.get("path"):
file_exists = True
break
if not file_exists:
media_files.append({
"path": str(transferinfo.fileitem.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
})
else:
media_files = [
{
"path": str(transferinfo.fileitem.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
]
media_list = {
"files": media_files,
"time": datetime.datetime.now()
}
else:
media_list = {
"files": [
{
"path": str(transferinfo.fileitem.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
],
"time": datetime.datetime.now()
}
self._msg_medias[mediainfo.title_year + " " + file_meta.season] = media_list
def __send_msg(self):
"""
定时检查是否有媒体处理完,发送统一消息
"""
if not self._msg_medias or not self._msg_medias.keys():
return
# 遍历检查是否已刮削完,发送消息
for medis_title_year_season in list(self._msg_medias.keys()):
media_list = self._msg_medias.get(medis_title_year_season)
logger.info(f"开始处理媒体 {medis_title_year_season} 消息")
if not media_list:
continue
# 获取最后更新时间
last_update_time = media_list.get("time")
media_files = media_list.get("files")
if not last_update_time or not media_files:
continue
transferinfo = media_files[0].get("transferinfo")
file_meta = media_files[0].get("file_meta")
mediainfo = media_files[0].get("mediainfo")
# 判断剧集最后更新时间距现在是已超过10秒或者电影发送消息
if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._msg_interval) \
or mediainfo.type == MediaType.MOVIE:
# 汇总处理文件总大小
total_size = 0
file_count = 0
# 剧集汇总
episodes = []
for file in media_files:
transferinfo = file.get("transferinfo")
total_size += transferinfo.total_size
file_count += 1
file_meta = file.get("file_meta")
if file_meta and file_meta.begin_episode:
episodes.append(file_meta.begin_episode)
transferinfo.total_size = total_size
# 汇总处理文件数量
transferinfo.file_count = file_count
# 剧集季集信息 S01 E01-E04 || S01 E01、E02、E04
season_episode = None
# 处理文件多,说明是剧集,显示季入库消息
if mediainfo.type == MediaType.TV:
# 季集文本
season_episode = f"{file_meta.season} {StringUtils.format_ep(episodes)}"
# 发送消息
self.transferchain.send_transfer_message(meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo,
season_episode=season_episode)
# 发送完消息移出key
del self._msg_medias[medis_title_year_season]
continue
def stop(self):
"""
退出插件