From c4300332c953e884e8b3782a1c042cba1b3d4072 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 23 Dec 2024 21:46:59 +0800 Subject: [PATCH] =?UTF-8?q?TODO=20=E5=90=8E=E5=8F=B0=E6=95=B4=E7=90=86?= =?UTF-8?q?=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/storage.py | 6 + app/chain/transfer.py | 600 ++++++++++++++++----------------- app/db/transferhistory_oper.py | 7 +- app/monitor.py | 358 +------------------- 4 files changed, 317 insertions(+), 654 deletions(-) diff --git a/app/chain/storage.py b/app/chain/storage.py index 17db3f83..7b21bf4f 100644 --- a/app/chain/storage.py +++ b/app/chain/storage.py @@ -84,6 +84,12 @@ class StorageChain(ChainBase): """ return self.run_module("rename_file", fileitem=fileitem, name=name) + def get_item(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]: + """ + 查询目录或文件 + """ + return self.get_file_item(storage=fileitem.storage, path=Path(fileitem.path)) + def get_file_item(self, storage: str, path: Path) -> Optional[schemas.FileItem]: """ 根据路径获取文件项 diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 396bdbcf..7fbac965 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -3,7 +3,7 @@ import re import threading from pathlib import Path from queue import Queue -from typing import List, Optional, Tuple, Union, Dict, Callable +from typing import List, Optional, Tuple, Union, Dict, Callable, Any from app.chain import ChainBase from app.chain.media import MediaChain @@ -12,7 +12,7 @@ from app.chain.tmdb import TmdbChain from app.core.config import settings, global_vars from app.core.context import MediaInfo from app.core.meta import MetaBase -from app.core.metainfo import MetaInfoPath, MetaInfo +from app.core.metainfo import MetaInfoPath from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.models.downloadhistory import DownloadHistory from app.db.models.transferhistory import TransferHistory @@ -28,9 +28,9 @@ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, SystemConfigKey from app.utils.singleton import Singleton from app.utils.string import StringUtils -from app.utils.system import SystemUtils -lock = threading.Lock() +downloader_lock = threading.Lock() +tree_lock = threading.Lock() class TransferChain(ChainBase, metaclass=Singleton): @@ -44,6 +44,9 @@ class TransferChain(ChainBase, metaclass=Singleton): # 待整理任务队列 _queue = Queue() + # 整理中的目录树 + _job_tree: Dict[str, Dict[str, Any]] = {} + # 文件整理线程 _transfer_thread = None @@ -72,6 +75,92 @@ class TransferChain(ChainBase, metaclass=Singleton): self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) self._transfer_thread.start() + def __default_callback(self, task: TransferTask, transferinfo: TransferInfo, /): + """ + 整理完成后处理 + """ + if not transferinfo: + logger.error("文件转移模块运行失败") + return + + if not transferinfo.success: + # 转移失败 + logger.warn(f"{task.file_path.name} 入库失败:{transferinfo.message}") + # 新增转移失败历史记录 + self.transferhis.add_fail( + fileitem=task.fileitem, + mode=transferinfo.transfer_type if transferinfo else '', + downloader=task.downloader, + download_hash=task.download_hash, + meta=task.meta, + mediainfo=task.mediainfo, + transferinfo=transferinfo + ) + # 发送失败消息 + self.post_message(Notification( + mtype=NotificationType.Manual, + title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!", + text=f"原因:{transferinfo.message or '未知'}", + image=task.mediainfo.get_message_image(), + link=settings.MP_DOMAIN('#/history') + )) + return + + # 转移成功 + logger.info(f"{task.file_path.name} 入库成功:{transferinfo.target_diritem.path}") + + # 新增转移成功历史记录 + self.transferhis.add_success( + fileitem=task.fileitem, + mode=transferinfo.transfer_type if transferinfo else '', + downloader=task.downloader, + download_hash=task.download_hash, + meta=task.meta, + mediainfo=task.mediainfo, + transferinfo=transferinfo + ) + + # 整理完成事件 + self.eventmanager.send_event(EventType.TransferComplete, { + 'fileitem': task.fileitem, + 'meta': task.meta, + 'mediainfo': task.mediainfo, + 'transferinfo': transferinfo, + 'downloader': task.downloader, + 'download_hash': task.download_hash, + }) + + # 整理完成一个媒体项时 + if True: + # 移动模式删除空目录 + if transferinfo.transfer_type in ["move"]: + # 下载器hash + if task.download_hash: + if self.remove_torrents(task.download_hash, downloader=task.downloader): + logger.info(f"移动模式删除种子成功:{task.download_hash} ") + # 删除残留目录 + if task.fileitem: + self.storagechain.delete_media_file(task.fileitem, delete_self=False) + + # TODO 发送通知 + """ + if transferinfo.need_notify: + se_str = None + if task.media.type == MediaType.TV: + se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}" + self.send_transfer_message(meta=transfer_meta, + mediainfo=media, + transferinfo=transfer_info, + season_episode=se_str) + # TODO 刮削事件 + if scrape or transfer_info.need_scrape: + self.eventmanager.send_event(EventType.MetadataScrape, { + 'meta': transfer_meta, + 'mediainfo': media, + 'fileitem': transfer_info.target_diritem + }) + """ + def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None): """ 添加到待整理队列 @@ -80,21 +169,70 @@ class TransferChain(ChainBase, metaclass=Singleton): """ if not task: return + # TODO 维护目录树 + + # 添加到队列 self._queue.put(TransferQueue( task=task, - callback=callback + callback=callback or self.__default_callback )) def __start_transfer(self): """ 处理队列 """ + # 任务总数 + total_num = 0 + # 已处理总数 + processed_num = 0 + # 失败数量 + fail_num = 0 + # 跳过数量 + skip_num = 0 + # 队列开始标识 + __queue_start = True + while not global_vars.is_system_stopped: try: item: TransferQueue = self._queue.get(timeout=self._transfer_interval) if item: + if __queue_start: + logger.info("开始整理队列处理...") + # 启动进度 + self.progress.start(ProgressKey.FileTransfer) + # TODO 计算总数 + total_num = 0 + self.progress.update(value=0, + text=f"开始整理队列处理,共 {total_num} 个文件或子目录 ...", + key=ProgressKey.FileTransfer) + # 队列已开始 + __queue_start = False + # TODO 重新计算总数 + total_num = 0 + # 更新进度 + __process_msg = f"正在整理 ({processed_num + 1}/{total_num}){item.task.fileitem.name} ..." + logger.info(__process_msg) + self.progress.update(value=processed_num / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) + # 整理 self.__handle_transfer(task=item.task, callback=item.callback) except queue.Empty: + if not __queue_start: + # 结束进度 + __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个,跳过 {skip_num} 个" + logger.info(__end_msg) + self.progress.update(value=100, + text=__end_msg, + key=ProgressKey.FileTransfer) + self.progress.end(ProgressKey.FileTransfer) + # 重置计数 + total_num = 0 + processed_num = 0 + fail_num = 0 + skip_num = 0 + # 标记为新队列 + __queue_start = True continue except Exception as e: logger.error(f"整理队列处理出现错误:{e}") @@ -139,7 +277,7 @@ class TransferChain(ChainBase, metaclass=Singleton): """ # 全局锁,避免重复处理 - with lock: + with downloader_lock: # 获取下载器监控目录 download_dirs = self.directoryhelper.get_download_dirs() # 如果没有下载器监控的目录则不处理 @@ -199,7 +337,7 @@ class TransferChain(ChainBase, metaclass=Singleton): mediainfo = None # 执行整理,匹配源目录 - state, errmsg = self.__do_transfer( + state, errmsg = self.do_transfer( fileitem=FileItem( storage="local", path=str(file_path), @@ -222,16 +360,85 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.info("所有下载器中下载完成的文件已整理完成") return True - def __do_transfer(self, fileitem: FileItem, - meta: MetaBase = None, mediainfo: MediaInfo = None, - target_directory: TransferDirectoryConf = None, - target_storage: str = None, target_path: Path = None, - transfer_type: str = None, scrape: bool = None, - library_type_folder: bool = None, library_category_folder: bool = None, - season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0, - downloader: str = None, download_hash: str = None, - force: bool = False, src_match: bool = False, - background: bool = False) -> Tuple[bool, str]: + def __get_trans_fileitems(self, fileitem: FileItem) -> List[Tuple[FileItem, bool]]: + """ + 获取整理目录或文件列表 + """ + + def __is_bluray_dir(_fileitem: FileItem) -> bool: + """ + 判断是不是蓝光目录 + """ + subs = self.storagechain.list_files(_fileitem) + if subs: + for sub in subs: + if sub.type == "dir" and sub.name in ["BDMV", "CERTIFICATE"]: + return True + return False + + def __is_bluray_sub(_path: str) -> bool: + """ + 判断是否蓝光原盘目录内的子目录或文件 + """ + return True if re.search(r"BDMV[/\\]STREAM", _path, re.IGNORECASE) else False + + def __get_bluray_dir(_path: Path) -> Optional[Path]: + """ + 获取蓝光原盘BDMV目录的上级目录 + """ + for p in _path.parents: + if p.name == "BDMV": + return p.parent + return None + + if not self.storagechain.get_item(fileitem): + logger.warn(f"目录或文件不存在:{fileitem.path}") + return [] + + # 蓝光原盘子目录或文件 + if __is_bluray_sub(fileitem.path): + return [(__get_bluray_dir(Path(fileitem.path)), True)] + + # 蓝光原盘根目录 + if __is_bluray_dir(fileitem): + return [(fileitem, True)] + + # 单文件 + if fileitem.type == "file": + return [(fileitem, False)] + + # 需要整理的文件项列表 + trans_items = [] + # 先检查当前目录的下级目录,以支持合集的情况 + for sub_dir in self.storagechain.list_files(fileitem): + if sub_dir.type == "dir": + if __is_bluray_dir(sub_dir): + trans_items.append((sub_dir, True)) + else: + trans_items.append((sub_dir, False)) + + if not trans_items: + # 没有有效子目录,直接整理当前目录 + trans_items.append((fileitem, False)) + else: + # 有子目录时,把当前目录的文件添加到整理任务中 + sub_items = self.storagechain.list_files(fileitem) + if sub_items: + sub_files = [f for f in sub_items if f.type == "file"] + if sub_files: + trans_items.extend((sub_files, False)) + + return trans_items + + def do_transfer(self, fileitem: FileItem, + meta: MetaBase = None, mediainfo: MediaInfo = None, + target_directory: TransferDirectoryConf = None, + target_storage: str = None, target_path: Path = None, + transfer_type: str = None, scrape: bool = None, + library_type_folder: bool = None, library_category_folder: bool = None, + season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0, + downloader: str = None, download_hash: str = None, + force: bool = False, src_match: bool = False) -> Tuple[bool, str]: """ 执行一个复杂目录的整理操作 :param fileitem: 文件项 @@ -251,16 +458,9 @@ class TransferChain(ChainBase, metaclass=Singleton): :param download_hash: 下载记录hash :param force: 是否强制整理 :param src_match: 是否源目录匹配 - :param background: 是否后台整理 返回:成功标识,错误信息 """ - def __callback(_task: TransferTask, _transferinfo: TransferInfo): - """ - TODO 整理完成回调 - """ - pass - # 自定义格式 formaterHandler = FormatParser(eformat=epformat.format, details=epformat.detail, @@ -269,73 +469,36 @@ class TransferChain(ChainBase, metaclass=Singleton): # 整理屏蔽词 transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords) - - # 开始进度 - self.progress.start(ProgressKey.FileTransfer) - - # 汇总季集清单 - season_episodes: Dict[Tuple, List[int]] = {} - # 汇总媒体信息 - medias: Dict[Tuple, MediaInfo] = {} - # 汇总整理信息 - transfers: Dict[Tuple, TransferInfo] = {} - # 待整理文件列表 - file_items: List[FileItem] = [] - # 蓝光目录列表 - bluray: List[FileItem] = [] + file_items: List[Tuple[FileItem, bool]] = [] # 汇总错误信息 err_msgs: List[str] = [] - # 已处理数量 - processed_num = 0 - # 失败数量 - fail_num = 0 - # 跳过数量 - skip_num = 0 - # 本次整理方式 - current_transfer_type = transfer_type - # 是否全部成功 - all_success = True # 获取待整理路径清单 trans_items = self.__get_trans_fileitems(fileitem) - # 总文件数 - total_num = len(trans_items) - self.progress.update(value=0, - text=f"开始 {fileitem.path},共 {total_num} 个文件或子目录 ...", - key=ProgressKey.FileTransfer) if not trans_items: logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" # 处理所有待整理目录或文件,默认一个整理路径或文件只有一个媒体信息 - for trans_item in trans_items: - item_path = Path(trans_item.path) - # 是否蓝光路径 - bluray_dir = trans_item.storage == "local" and SystemUtils.is_bluray_dir(item_path) + for trans_item, bluray_dir in trans_items: # 如果是目录且不是⼀蓝光原盘,获取所有文件并整理 if trans_item.type == "dir" and not bluray_dir: # 遍历获取下载目录所有文件(递归) if files := self.storagechain.list_files(trans_item, recursion=True): - file_items.extend(files) - # 如果是蓝光目录,计算⼤⼩ - elif bluray_dir: - bluray.append(trans_item) - # 单个文件 + file_items.extend([(file, False) for file in files]) else: - file_items.append(trans_item) + file_items.append((trans_item, bluray_dir)) if formaterHandler: # 有集自定义格式,过滤文件 - file_items = [f for f in file_items if formaterHandler.match(f.name)] + file_items = [f for f in file_items if formaterHandler.match(f[0].name)] # 过滤后缀和大小 file_items = [f for f in file_items - if f.extension and (f".{f.extension.lower()}" in self.all_exts - and (not min_filesize or f.size > min_filesize * 1024 * 1024))] - # BDMV 跳过过滤 - file_items.extend(bluray) + if f[0].extension and (f".{f[0].extension.lower()}" in self.all_exts + and (not min_filesize or f[0].size > min_filesize * 1024 * 1024))] if not file_items: logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" @@ -345,7 +508,7 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.info(f"正在整理 {total_num} 个文件...") # 整理所有文件 - for file_item in file_items: + for file_item, bluray_dir in file_items: if global_vars.is_system_stopped: break file_path = Path(file_item.path) @@ -355,9 +518,6 @@ class TransferChain(ChainBase, metaclass=Singleton): or file_item.path.find('/.') != -1 \ or file_item.path.find('/@eaDir') != -1: logger.debug(f"{file_item.path} 是回收站或隐藏的文件") - # 计数 - processed_num += 1 - skip_num += 1 continue # 整理屏蔽词不处理 @@ -372,10 +532,6 @@ class TransferChain(ChainBase, metaclass=Singleton): break if is_blocked: err_msgs.append(f"{file_item.name} 命中整理屏蔽词") - # 计数 - processed_num += 1 - skip_num += 1 - all_success = False continue # 整理成功的不再处理 @@ -383,17 +539,8 @@ class TransferChain(ChainBase, metaclass=Singleton): transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage) if transferd and transferd.status: logger.info(f"{file_item.path} 已成功整理过,如需重新处理,请删除历史记录。") - # 计数 - processed_num += 1 - skip_num += 1 - all_success = False continue - # 更新进度 - self.progress.update(value=processed_num / total_num * 100, - text=f"正在整理 ({processed_num + 1}/{total_num}){file_item.name} ...", - key=ProgressKey.FileTransfer) - if not meta: # 文件元数据 file_meta = MetaInfoPath(file_path) @@ -407,10 +554,6 @@ class TransferChain(ChainBase, metaclass=Singleton): if not file_meta: logger.error(f"{file_path} 无法识别有效信息") err_msgs.append(f"{file_path} 无法识别有效信息") - # 计数 - processed_num += 1 - fail_num += 1 - all_success = False continue # 自定义识别 @@ -423,9 +566,38 @@ class TransferChain(ChainBase, metaclass=Singleton): if end_ep is not None: file_meta.end_episode = end_ep + # 根据父路径获取下载历史 + download_history = None + if bluray_dir: + # 蓝光原盘,按目录名查询 + download_history = self.downloadhis.get_by_path(str(file_path)) + else: + # 按文件全路径查询 + download_file = self.downloadhis.get_file_by_fullpath(str(file_path)) + if download_file: + download_history = self.downloadhis.get_by_hash(download_file.download_hash) + + # 获取下载Hash + if download_history and (not downloader or not download_hash): + downloader = download_history.downloader + download_hash = download_history.download_hash + if not mediainfo: # 识别媒体信息 - file_mediainfo = self.mediachain.recognize_by_meta(file_meta) + if download_history and (download_history.tmdbid or download_history.doubanid): + # 下载记录中已存在识别信息 + file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type), + tmdbid=download_history.tmdbid, + doubanid=download_history.doubanid) + if mediainfo: + # 更新自定义媒体类别 + if download_history.media_category: + mediainfo.category = download_history.media_category + else: + # 识别媒体信息 + file_mediainfo = self.mediachain.recognize_by_meta(file_meta) + # 更新媒体图片 + self.obtain_images(mediainfo=file_mediainfo) else: file_mediainfo = mediainfo @@ -436,6 +608,7 @@ class TransferChain(ChainBase, metaclass=Singleton): fileitem=file_item, mode=transfer_type, meta=file_meta, + downloader=downloader, download_hash=download_hash ) self.post_message(Notification( @@ -444,10 +617,6 @@ class TransferChain(ChainBase, metaclass=Singleton): text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。", link=settings.MP_DOMAIN('#/history') )) - # 计数 - processed_num += 1 - fail_num += 1 - all_success = False continue # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title @@ -471,12 +640,6 @@ class TransferChain(ChainBase, metaclass=Singleton): else: episodes_info = None - # 获取下载hash - if not download_hash: - download_file = self.downloadhis.get_file_by_fullpath(file_item.path) - if download_file: - download_hash = download_file.download_hash - # 查询整理目标目录 dir_info = None if not target_directory: @@ -498,205 +661,24 @@ class TransferChain(ChainBase, metaclass=Singleton): target_storage=target_storage) # 后台整理 - if background: - self.__handle_transfer(task=TransferTask( - fileitem=file_item, - meta=file_meta, - mediainfo=file_mediainfo, - target_directory=target_directory or dir_info, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - episodes_info=episodes_info, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder, - downloader=downloader, - download_hash=download_hash - ), callback=__callback) - continue - - # 实时整理 - transferinfo: TransferInfo = self.transfer(fileitem=file_item, - meta=file_meta, - mediainfo=file_mediainfo, - target_directory=target_directory or dir_info, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - episodes_info=episodes_info, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder) - if not transferinfo: - logger.error("文件整理模块运行失败") - return False, "文件整理模块运行失败" - if not transferinfo.success: - # 整理失败 - logger.warn(f"{file_path.name} 入库失败:{transferinfo.message}") - err_msgs.append(f"{file_path.name} {transferinfo.message}") - # 新增整理失败历史记录 - self.transferhis.add_fail( - fileitem=file_item, - mode=transfer_type, - download_hash=download_hash, - meta=file_meta, - mediainfo=file_mediainfo, - transferinfo=transferinfo - ) - # 发送消息 - self.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{file_mediainfo.title_year} {file_meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", - image=file_mediainfo.get_message_image(), - link=settings.MP_DOMAIN('#/history') - )) - # 计数 - processed_num += 1 - fail_num += 1 - all_success = False - continue - - # 汇总信息 - current_transfer_type = transferinfo.transfer_type - mkey = (file_mediainfo.tmdb_id, file_meta.begin_season) - if mkey not in medias: - # 新增信息 - medias[mkey] = file_mediainfo - season_episodes[mkey] = file_meta.episode_list - transfers[mkey] = transferinfo - else: - # 合并季集清单 - season_episodes[mkey] = list(set(season_episodes[mkey] + file_meta.episode_list)) - # 合并整理数据 - transfers[mkey].file_count += transferinfo.file_count - transfers[mkey].total_size += transferinfo.total_size - transfers[mkey].file_list.extend(transferinfo.file_list) - transfers[mkey].file_list_new.extend(transferinfo.file_list_new) - transfers[mkey].fail_list.extend(transferinfo.fail_list) - - # 新增整理成功历史记录 - self.transferhis.add_success( + self.__handle_transfer(task=TransferTask( fileitem=file_item, - mode=transfer_type or transferinfo.transfer_type, - download_hash=download_hash, meta=file_meta, mediainfo=file_mediainfo, - transferinfo=transferinfo - ) - - # 整理完成事件 - self.eventmanager.send_event(EventType.TransferComplete, { - 'meta': file_meta, - 'mediainfo': file_mediainfo, - 'transferinfo': transferinfo, - 'downloader': downloader, - 'download_hash': download_hash, - }) - - # 更新进度 - processed_num += 1 - self.progress.update(value=processed_num / total_num * 100, - text=f"{file_path.name} 整理完成", - key=ProgressKey.FileTransfer) - - # 目录或文件整理完成 - self.progress.update(text=f"{fileitem.path} 整理完成,正在执行后续处理 ...", - key=ProgressKey.FileTransfer) - - # 执行后续处理 - for mkey, media in medias.items(): - transfer_info = transfers[mkey] - transfer_meta = MetaInfo(transfer_info.target_diritem.name) - transfer_meta.begin_season = mkey[1] - # 发送通知 - if transfer_info.need_notify: - se_str = None - if media.type == MediaType.TV: - se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}" - self.send_transfer_message(meta=transfer_meta, - mediainfo=media, - transferinfo=transfer_info, - season_episode=se_str) - # 刮削事件 - if scrape or transfer_info.need_scrape: - self.eventmanager.send_event(EventType.MetadataScrape, { - 'meta': transfer_meta, - 'mediainfo': media, - 'fileitem': transfer_info.target_diritem - }) - - # 移动模式处理 - if all_success and current_transfer_type in ["move"]: - # 下载器hash - if download_hash: - if self.remove_torrents(download_hash, downloader=downloader): - logger.info(f"移动模式删除种子成功:{download_hash} ") - # 删除残留目录 - if fileitem: - self.storagechain.delete_media_file(fileitem, delete_self=False) - - # 结束进度 - logger.info(f"{fileitem.path} 整理完成,共 {total_num} 个文件," - f"失败 {fail_num} 个,跳过 {skip_num} 个") - - self.progress.update(value=100, - text=f"{fileitem.path} 整理完成,共 {total_num} 个文件," - f"失败 {fail_num} 个,跳过 {skip_num} 个", - key=ProgressKey.FileTransfer) - # 结速进度 - self.progress.end(ProgressKey.FileTransfer) + target_directory=target_directory or dir_info, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + episodes_info=episodes_info, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + downloader=downloader, + download_hash=download_hash + )) return True, "\n".join(err_msgs) - def __get_trans_fileitems(self, fileitem: FileItem): - """ - 获取整理目录或文件列表 - """ - - file_path = Path(fileitem.path) - - if fileitem.storage == "local" and not file_path.exists(): - logger.warn(f"目录不存在:{fileitem.path}") - return [] - - # 单文件 - if fileitem.type == "file": - return [fileitem] - - # 蓝光原盘 - if fileitem.storage == "local" and SystemUtils.is_bluray_dir(file_path): - return [fileitem] - - # 需要整理的文件项列表 - trans_items = [] - - # 先检查当前目录的下级目录,以支持合集的情况 - for sub_dir in self.storagechain.list_files(fileitem): - subfile_path = Path(sub_dir.path) - # 添加蓝光原盘 - if sub_dir.storage == "local" \ - and sub_dir.type == "dir" \ - and SystemUtils.is_bluray_dir(subfile_path): - trans_items.append(sub_dir) - # 添加目录 - elif sub_dir.type == "dir": - trans_items.append(sub_dir) - - if not trans_items: - # 没有有效子目录,直接整理当前目录 - trans_items.append(fileitem) - else: - # 有子目录时,把当前目录的文件添加到整理任务中 - sub_items = self.storagechain.list_files(fileitem) - if sub_items: - sub_files = [f for f in sub_items if f.type == "file"] - if sub_files: - trans_items.extend(sub_files) - - return trans_items - def remote_transfer(self, arg_str: str, channel: MessageChannel, userid: Union[str, int] = None, source: str = None): """ @@ -778,10 +760,10 @@ class TransferChain(ChainBase, metaclass=Singleton): # 强制整理 if history.src_fileitem: - state, errmsg = self.__do_transfer(fileitem=FileItem(**history.src_fileitem), - mediainfo=mediainfo, - download_hash=history.download_hash, - force=True) + state, errmsg = self.do_transfer(fileitem=FileItem(**history.src_fileitem), + mediainfo=mediainfo, + download_hash=history.download_hash, + force=True) if not state: return False, errmsg @@ -835,7 +817,7 @@ class TransferChain(ChainBase, metaclass=Singleton): text=f"开始整理 {fileitem.path} ...", key=ProgressKey.FileTransfer) # 开始整理 - state, errmsg = self.__do_transfer( + state, errmsg = self.do_transfer( fileitem=fileitem, target_storage=target_storage, target_path=target_path, @@ -857,17 +839,17 @@ class TransferChain(ChainBase, metaclass=Singleton): return True, "" else: # 没有输入TMDBID时,按文件识别 - state, errmsg = self.__do_transfer(fileitem=fileitem, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - season=season, - epformat=epformat, - min_filesize=min_filesize, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder, - force=force) + state, errmsg = self.do_transfer(fileitem=fileitem, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + season=season, + epformat=epformat, + min_filesize=min_filesize, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + force=force) return state, errmsg def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo, diff --git a/app/db/transferhistory_oper.py b/app/db/transferhistory_oper.py index 88b02fb2..3b25f1c8 100644 --- a/app/db/transferhistory_oper.py +++ b/app/db/transferhistory_oper.py @@ -120,7 +120,7 @@ class TransferHistoryOper(DbOper): def add_success(self, fileitem: FileItem, mode: str, meta: MetaBase, mediainfo: MediaInfo, transferinfo: TransferInfo, - download_hash: str = None): + downloader: str = None, download_hash: str = None): """ 新增转移成功历史记录 """ @@ -143,13 +143,14 @@ class TransferHistoryOper(DbOper): seasons=meta.season, episodes=meta.episode, image=mediainfo.get_poster_image(), + downloader=downloader, download_hash=download_hash, status=1, files=transferinfo.file_list ) def add_fail(self, fileitem: FileItem, mode: str, meta: MetaBase, mediainfo: MediaInfo = None, - transferinfo: TransferInfo = None, download_hash: str = None): + transferinfo: TransferInfo = None, downloader: str = None, download_hash: str = None): """ 新增转移失败历史记录 """ @@ -173,6 +174,7 @@ class TransferHistoryOper(DbOper): seasons=meta.season, episodes=meta.episode, image=mediainfo.get_poster_image(), + downloader=downloader, download_hash=download_hash, status=0, errmsg=transferinfo.message or '未知错误', @@ -188,6 +190,7 @@ class TransferHistoryOper(DbOper): mode=mode, seasons=meta.season, episodes=meta.episode, + downloader=downloader, download_hash=download_hash, status=0, errmsg="未识别到媒体信息" diff --git a/app/monitor.py b/app/monitor.py index 99ce636e..8c34c9bf 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -1,6 +1,4 @@ -import datetime import platform -import re import threading import traceback from pathlib import Path @@ -17,19 +15,14 @@ from app.chain.storage import StorageChain from app.chain.tmdb import TmdbChain from app.chain.transfer import TransferChain from app.core.config import settings -from app.core.context import MediaInfo -from app.core.event import EventManager -from app.core.metainfo import MetaInfoPath from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.systemconfig_oper import SystemConfigOper from app.db.transferhistory_oper import TransferHistoryOper from app.helper.directory import DirectoryHelper from app.helper.message import MessageHelper from app.log import logger -from app.schemas import FileItem, TransferInfo, Notification, TransferTask -from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType +from app.schemas import FileItem, TransferTask from app.utils.singleton import Singleton -from app.utils.string import StringUtils lock = Lock() snapshot_lock = Lock() @@ -76,12 +69,6 @@ class Monitor(metaclass=Singleton): # 存储过照间隔(分钟) _snapshot_interval = 5 - # 消息汇总 - _msg_medias = {} - - # 消息汇总间隔(秒) - _msg_interval = 60 - def __init__(self): super().__init__() self.chain = MonitorChain() @@ -166,9 +153,6 @@ class Monitor(metaclass=Singleton): 'storage': mon_dir.storage, 'mon_path': mon_path }) - - # 追加入库消息统一发送服务 - self._scheduler.add_job(self.__send_msg, trigger='interval', seconds=15) # 启动定时服务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() @@ -231,74 +215,8 @@ class Monitor(metaclass=Singleton): """ 添加到整理队列 """ - - def __callback(_task: TransferTask, _transferinfo: TransferInfo, /): - """ - 整理完成后处理 - """ - if not _transferinfo: - logger.error("文件转移模块运行失败") - return - - if not _transferinfo.success: - # 转移失败 - logger.warn(f"{_task.file_path.name} 入库失败:{_transferinfo.message}") - # 新增转移失败历史记录 - self.transferhis.add_fail( - fileitem=_task.fileitem, - mode=_transferinfo.transfer_type if _transferinfo else '', - download_hash=_task.download_hash, - meta=_task.meta, - mediainfo=_task.mediainfo, - transferinfo=_transferinfo - ) - # 发送失败消息 - self.chain.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{_task.mediainfo.title_year} {_task.meta.season_episode} 入库失败!", - text=f"原因:{_transferinfo.message or '未知'}", - image=_task.mediainfo.get_message_image(), - link=settings.MP_DOMAIN('#/history') - )) - return - - # 转移成功 - logger.info(f"{_task.file_path.name} 入库成功:{_transferinfo.target_diritem.path}") - # 新增转移成功历史记录 - self.transferhis.add_success( - fileitem=_task.fileitem, - mode=_transferinfo.transfer_type if _transferinfo else '', - download_hash=_task.download_hash, - meta=_task.meta, - mediainfo=_task.mediainfo, - transferinfo=_transferinfo - ) - - # 汇总刮削 - if _transferinfo.need_scrape: - self.mediaChain.scrape_metadata(fileitem=_transferinfo.target_diritem, - meta=_task.meta, - mediainfo=_task.mediainfo) - - # 广播事件 - EventManager().send_event(EventType.TransferComplete, { - 'fileitem': _task.fileitem, - 'meta': _task.meta, - 'mediainfo': _task.mediainfo, - 'transferinfo': _transferinfo - }) - - # 发送消息汇总 - if _transferinfo.need_notify: - self.__collect_msg_medias(mediainfo=_task.mediainfo, file_meta=_task.meta, - transferinfo=_transferinfo) - - # 移动模式删除空目录 - if _transferinfo.transfer_type in ["move"]: - self.storagechain.delete_media_file(_task.fileitem, delete_self=False) - - # 加入整理队列 - self.transferchain.put_to_queue(task=task, callback=__callback) + # 加入整理队列,使用默认的回调函数 + self.transferchain.put_to_queue(task=task) def __handle_file(self, storage: str, event_path: Path): """ @@ -306,271 +224,25 @@ class Monitor(metaclass=Singleton): :param storage: 存储 :param event_path: 事件文件路径 """ - - def __get_bluray_dir(_path: Path): - """ - 获取BDMV目录的上级目录 - """ - for p in _path.parents: - if p.name == "BDMV": - return p.parent - return None - # 全程加锁 with lock: try: - # 回收站及隐藏的文件不处理 - if str(event_path).find('/@Recycle/') != -1 \ - or str(event_path).find('/#recycle/') != -1 \ - or str(event_path).find('/.') != -1 \ - or str(event_path).find('/@eaDir') != -1: - logger.debug(f"{event_path} 是回收站或隐藏的文件") - return - - # 不是媒体文件不处理 - if event_path.suffix.lower() not in self.all_exts: - logger.debug(f"{event_path} 不是媒体文件") - return - - # 整理屏蔽词不处理 - transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords) - if transfer_exclude_words: - for keyword in transfer_exclude_words: - if not keyword: - continue - if keyword and re.search(r"%s" % keyword, str(event_path), re.IGNORECASE): - logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理") - return - - # 判断是不是蓝光目录 - bluray_flag = False - if re.search(r"BDMV[/\\]STREAM", str(event_path), re.IGNORECASE): - bluray_flag = True - # 截取BDMV前面的路径 - event_path = __get_bluray_dir(event_path) - logger.info(f"{event_path} 是蓝光原盘目录,更正文件路径为:{event_path}") - - # 查询历史记录,已转移的不处理 - if self.transferhis.get_by_src(str(event_path), storage=storage): - logger.info(f"{event_path} 已经整理过了") - return - - # 元数据 - file_meta = MetaInfoPath(event_path) - if not file_meta.name: - logger.error(f"{event_path.name} 无法识别有效信息") - return - - # 根据父路径获取下载历史 - download_history = None - if bluray_flag: - # 蓝光原盘,按目录名查询 - download_history = self.downloadhis.get_by_path(str(event_path)) - else: - # 按文件全路径查询 - download_file = self.downloadhis.get_file_by_fullpath(str(event_path)) - if download_file: - download_history = self.downloadhis.get_by_hash(download_file.download_hash) - - # 获取下载Hash - downloader, download_hash = None, None - if download_history: - downloader = download_history.downloader - download_hash = download_history.download_hash - - # 识别媒体信息 - if download_history and (download_history.tmdbid or download_history.doubanid): - # 下载记录中已存在识别信息 - mediainfo: MediaInfo = self.mediaChain.recognize_media(mtype=MediaType(download_history.type), - tmdbid=download_history.tmdbid, - doubanid=download_history.doubanid) - if mediainfo: - # 更新自定义媒体类别 - if download_history.media_category: - mediainfo.category = download_history.media_category - - else: - mediainfo: MediaInfo = self.mediaChain.recognize_by_meta(file_meta) - if not mediainfo: - logger.warn(f'未识别到媒体信息,标题:{file_meta.name}') - # 新增转移失败历史记录 - his = self.transferhis.add_fail( - fileitem=FileItem( - storage=storage, - type="file", - path=str(event_path), - name=event_path.name, - basename=event_path.stem, - extension=event_path.suffix[1:], - ), - mode='', - meta=file_meta, - download_hash=download_hash - ) - self.chain.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{event_path.name} 未识别到媒体信息,无法入库!", - text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别转移。", - link=settings.MP_DOMAIN('#/history') - )) - return - - # 查询转移目的目录 - dir_info = self.directoryhelper.get_dir(mediainfo, storage=storage, src_path=event_path) - if not dir_info: - logger.warn(f"{event_path.name} 未找到对应的目标目录") - return - - # 查找这个文件项 - file_item = self.storagechain.get_file_item(storage=storage, path=event_path) - if not file_item: - logger.warn(f"{event_path.name} 未找到对应的文件") - return - - # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title - if not settings.SCRAP_FOLLOW_TMDB: - transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, - mtype=mediainfo.type.value) - if transfer_history: - mediainfo.title = transfer_history.title - logger.info(f"{event_path.name} 识别为:{mediainfo.type.value} {mediainfo.title_year}") - - # 更新媒体图片 - self.chain.obtain_images(mediainfo=mediainfo) - - # 获取集数据 - if mediainfo.type == MediaType.TV: - episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id, - season=file_meta.begin_season or 1) - else: - episodes_info = None - - # 进入队列 - self.__transfer_queue( - TransferTask( - fileitem=file_item, - file_path=event_path, - meta=file_meta, - mediainfo=mediainfo, - target_directory=dir_info, - episodes_info=episodes_info, - downloader=downloader, - download_hash=download_hash - ) + # 开始整理 + # TODO 缺少文件大小 + self.transferchain.do_transfer( + fileitem=FileItem( + storage=storage, + path=str(event_path), + type="file", + name=event_path.name, + basename=event_path.stem, + extension=event_path.suffix[1:] + ), + src_match=True ) - except Exception as e: logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) - def __collect_msg_medias(self, mediainfo: MediaInfo, file_meta: MetaInfoPath, transferinfo: TransferInfo): - """ - 收集媒体处理完的消息 - """ - media_list = self._msg_medias.get(mediainfo.title_year + " " + file_meta.season) or {} - if media_list: - media_files = media_list.get("files") or [] - if media_files: - file_exists = False - for file in media_files: - if str(transferinfo.fileitem.path) == file.get("path"): - file_exists = True - break - if not file_exists: - media_files.append({ - "path": str(transferinfo.fileitem.path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - }) - else: - media_files = [ - { - "path": str(transferinfo.fileitem.path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ] - media_list = { - "files": media_files, - "time": datetime.datetime.now() - } - else: - media_list = { - "files": [ - { - "path": str(transferinfo.fileitem.path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ], - "time": datetime.datetime.now() - } - self._msg_medias[mediainfo.title_year + " " + file_meta.season] = media_list - - def __send_msg(self): - """ - 定时检查是否有媒体处理完,发送统一消息 - """ - if not self._msg_medias or not self._msg_medias.keys(): - return - - # 遍历检查是否已刮削完,发送消息 - for medis_title_year_season in list(self._msg_medias.keys()): - media_list = self._msg_medias.get(medis_title_year_season) - logger.info(f"开始处理媒体 {medis_title_year_season} 消息") - - if not media_list: - continue - - # 获取最后更新时间 - last_update_time = media_list.get("time") - media_files = media_list.get("files") - if not last_update_time or not media_files: - continue - - transferinfo = media_files[0].get("transferinfo") - file_meta = media_files[0].get("file_meta") - mediainfo = media_files[0].get("mediainfo") - # 判断剧集最后更新时间距现在是已超过10秒或者电影,发送消息 - if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._msg_interval) \ - or mediainfo.type == MediaType.MOVIE: - - # 汇总处理文件总大小 - total_size = 0 - file_count = 0 - - # 剧集汇总 - episodes = [] - for file in media_files: - transferinfo = file.get("transferinfo") - total_size += transferinfo.total_size - file_count += 1 - - file_meta = file.get("file_meta") - if file_meta and file_meta.begin_episode: - episodes.append(file_meta.begin_episode) - - transferinfo.total_size = total_size - # 汇总处理文件数量 - transferinfo.file_count = file_count - - # 剧集季集信息 S01 E01-E04 || S01 E01、E02、E04 - season_episode = None - # 处理文件多,说明是剧集,显示季入库消息 - if mediainfo.type == MediaType.TV: - # 季集文本 - season_episode = f"{file_meta.season} {StringUtils.format_ep(episodes)}" - # 发送消息 - self.transferchain.send_transfer_message(meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo, - season_episode=season_episode) - # 发送完消息,移出key - del self._msg_medias[medis_title_year_season] - continue - def stop(self): """ 退出插件