From e3552d40862a9a9e87df68a9af64a825583b5c9e Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 27 Dec 2024 17:45:04 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E8=AF=86=E5=88=AB=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=90=8E=E5=8F=B0=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/transfer.py | 336 ++++++++++++++++++++++++---------------- app/schemas/history.py | 2 + app/schemas/transfer.py | 15 +- 3 files changed, 216 insertions(+), 137 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index d5b61755..f6fdf21f 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -70,7 +70,10 @@ class JobManager: 获取作业ID """ if task: - return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season + if task.mediainfo: + return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season + else: + return task.meta.name, task.meta.begin_season else: return media.tmdb_id or media.douban_id, season @@ -79,9 +82,20 @@ class JobManager: """ 获取媒体信息 """ - mediainfo = deepcopy(task.mediainfo) - mediainfo.clear() - return schemas.MediaInfo(**mediainfo.to_dict()) + if task.mediainfo: + # 有媒体信息 + mediainfo = deepcopy(task.mediainfo) + mediainfo.clear() + return schemas.MediaInfo(**mediainfo.to_dict()) + else: + # 没有媒体信息 + meta: MetaBase = task.meta + return schemas.MediaInfo( + title=meta.name, + year=meta.year, + title_year=f"{meta.name} ({meta.year})", + type=meta.type.value if meta.type else None + ) @staticmethod def __get_meta(task: TransferTask) -> schemas.MetaInfo: @@ -90,11 +104,11 @@ class JobManager: """ return schemas.MetaInfo(**task.meta.to_dict()) - def add_task(self, task: TransferTask): + def add_task(self, task: TransferTask, state: str = "waiting"): """ 添加整理任务 """ - if not any([task, task.mediainfo, task.meta, task.fileitem]): + if not any([task, task.meta, task.fileitem]): return with job_lock: __mediaid__ = self.__get_id(task) @@ -103,14 +117,14 @@ class JobManager: "media": self.__get_media(task), "season": task.meta.begin_season, "tasks": [{ - "state": "waiting", + "state": state, "fileitem": task.fileitem, "meta": self.__get_meta(task) }] } else: self._job_view[__mediaid__]["tasks"].append({ - "state": "waiting", + "state": state, "fileitem": task.fileitem, "meta": self.__get_meta(task) }) @@ -153,8 +167,6 @@ class JobManager: """ 任务失败 """ - if not task or not task.mediainfo or not task.meta: - return with job_lock: __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: @@ -170,7 +182,7 @@ class JobManager: set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list) ) - def remove_task(self, fileitem: FileItem): + def remove_task(self, fileitem: FileItem) -> Optional[Dict[str, Any]]: """ 移除整理任务 """ @@ -178,7 +190,7 @@ class JobManager: for mediaid in list(self._job_view): job = self._job_view[mediaid] for task in job["tasks"]: - if task["fileitem"].path == fileitem.path and task["fileitem"].storage == fileitem.storage: + if task["fileitem"] == fileitem: job["tasks"].remove(task) # 如果没有作业了,则移除作业 if not job["tasks"]: @@ -188,14 +200,12 @@ class JobManager: self._season_episodes[mediaid] = list( set(self._season_episodes[mediaid]) - set(task["meta"].episode_list) ) - break + return task def remove_job(self, media: MediaInfo, season: int = None) -> Optional[Dict[str, Any]]: """ 移除作业 """ - if not media: - return __mediaid__ = self.__get_id(media=media, season=season) with job_lock: # 移除作业 @@ -205,17 +215,45 @@ class JobManager: self._season_episodes.pop(__mediaid__) return self._job_view.pop(__mediaid__) - def is_finished(self, media: MediaInfo, season: int = None) -> bool: + def is_done(self, media: MediaInfo, season: int = None) -> bool: """ - 任务检查某项任务是否全部为已完成状态 + 检查某项作业是否整理完成(不管成功还是失败) """ - if not media: - return False __mediaid__ = self.__get_id(media=media, season=season) if __mediaid__ not in self._job_view: return False with job_lock: - return all([job["state"] in ["completed", "failed"] for job in self._job_view[__mediaid__]["tasks"]]) + return all( + [task["state"] in ["completed", "failed"] for task in self._job_view[__mediaid__]["tasks"]] + ) + + def is_finished(self, media: MediaInfo, season: int = None) -> bool: + """ + 检查某项作业是否已完成且有成功的记录 + """ + __mediaid__ = self.__get_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return False + with job_lock: + # 所有任务都是完成或者失败状态,且有完成的记录 + tasks = self._job_view[__mediaid__]["tasks"] + return True if all( + [task["state"] in ["completed", "failed"] for task in tasks] + ) and any( + [task["state"] == "completed" for task in tasks] + ) else False + + def is_success(self, media: MediaInfo, season: int = None) -> bool: + """ + 检查某项作业是否全部成功 + """ + __mediaid__ = self.__get_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return False + with job_lock: + return all( + [task["state"] in ["completed"] for task in self._job_view[__mediaid__]["tasks"]] + ) def count(self, media: MediaInfo, season: int = None) -> int: """ @@ -233,7 +271,7 @@ class JobManager: 获取所有task任务总数 """ with job_lock: - return sum([len(jobs) for jobs in self._job_view.values()]) + return sum([len(job["tasks"]) for job in self._job_view.values()]) def list_jobs(self) -> List[dict]: """ @@ -245,8 +283,6 @@ class JobManager: """ 获取季集清单 """ - if not media: - return [] __mediaid__ = self.__get_id(media=media, season=season) with job_lock: return self._season_episodes.get(__mediaid__) or [] @@ -347,8 +383,8 @@ class TransferChain(ChainBase, metaclass=Singleton): 'download_hash': task.download_hash, }) - # 整理完成一个媒体项时 - if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): + # 全部整理成功时 + if self.jobview.is_success(task.mediainfo, task.meta.begin_season): # 移动模式删除空目录 if transferinfo.transfer_type in ["move"]: # 下载器hash @@ -359,6 +395,8 @@ class TransferChain(ChainBase, metaclass=Singleton): if task.fileitem: self.storagechain.delete_media_file(task.fileitem, delete_self=False) + # 整理完成且有成功的任务时 + if self.jobview.is_finished(task.mediainfo, task.meta.begin_season): # 发送通知 if transferinfo.need_notify: se_str = None @@ -425,58 +463,60 @@ class TransferChain(ChainBase, metaclass=Singleton): try: item: TransferQueue = self._queue.get(timeout=self._transfer_interval) if item: - callback = item.callback task = item.task if not task: continue - mediainfo = task.mediainfo - meta = task.meta - fileitem = task.fileitem # 正在处理 self.jobview.running_task(task) + # 文件信息 + fileitem = task.fileitem + # 开始新队列 if __queue_start: logger.info("开始整理队列处理...") # 启动进度 self.progress.start(ProgressKey.FileTransfer) + # 重置计数 + processed_num = 0 + fail_num = 0 total_num = self.jobview.total() - __process_msg = f"开始整理队列处理,共 {total_num} 个文件或子目录 ..." + __process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..." logger.info(__process_msg) self.progress.update(value=0, text=__process_msg, key=ProgressKey.FileTransfer) # 队列已开始 __queue_start = False - # 重新计算总数 - total_num = self.jobview.total() # 更新进度 - __process_msg = f"正在整理 ({processed_num + 1}/{total_num}){fileitem.name} ..." + __process_msg = f"正在整理 {fileitem.name} ..." logger.info(__process_msg) self.progress.update(value=processed_num / total_num * 100, text=__process_msg, key=ProgressKey.FileTransfer) # 整理 - state, err_msg = self.__handle_transfer(task=task, callback=callback) - if state: - # 任务成功 - processed_num += 1 - else: + state, err_msg = self.__handle_transfer(task=task, callback=item.callback) + if not state: # 任务失败 - logger.warn(f"{fileitem.name} 整理失败:{err_msg}") fail_num += 1 + # 更新进度 + processed_num += 1 + __process_msg = f"{fileitem.name} 整理完成" + logger.info(__process_msg) + self.progress.update(value=processed_num / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) # 移除已完成的任务 - if self.jobview.is_finished(mediainfo, meta.begin_season): - self.jobview.remove_job(mediainfo, meta.begin_season) + if self.jobview.is_done(task.mediainfo, task.meta.begin_season): + self.jobview.remove_job(task.mediainfo, task.meta.begin_season) except queue.Empty: if not __queue_start: # 结束进度 - __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" + __end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) self.progress.update(value=100, text=__end_msg, key=ProgressKey.FileTransfer) self.progress.end(ProgressKey.FileTransfer) # 重置计数 - total_num = 0 processed_num = 0 fail_num = 0 # 标记为新队列 @@ -490,6 +530,85 @@ class TransferChain(ChainBase, metaclass=Singleton): """ 处理整理任务 """ + # 识别 + if not task.mediainfo: + download_history = task.download_history + # 识别媒体信息 + if download_history and (download_history.tmdbid or download_history.doubanid): + # 下载记录中已存在识别信息 + mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type), + tmdbid=download_history.tmdbid, + doubanid=download_history.doubanid) + if mediainfo: + # 更新自定义媒体类别 + if download_history.media_category: + mediainfo.category = download_history.media_category + else: + # 识别媒体信息 + mediainfo = self.mediachain.recognize_by_meta(task.meta) + # 更新媒体图片 + if mediainfo: + self.obtain_images(mediainfo=mediainfo) + if not mediainfo: + # 新增整理失败历史记录 + his = self.transferhis.add_fail( + fileitem=task.fileitem, + mode=task.transfer_type, + meta=task.meta, + downloader=task.downloader, + download_hash=task.download_hash + ) + self.post_message(Notification( + mtype=NotificationType.Manual, + title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!", + text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。", + link=settings.MP_DOMAIN('#/history') + )) + # 任务失败 + return False, "未识别到媒体信息" + + # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title + if not settings.SCRAP_FOLLOW_TMDB: + transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, + mtype=mediainfo.type.value) + if transfer_history: + mediainfo.title = transfer_history.title + + # 获取集数据 + if not task.episodes_info and mediainfo.type == MediaType.TV: + if task.meta.begin_season is None: + task.meta.begin_season = 1 + mediainfo.season = mediainfo.season or task.meta.begin_season + task.episodes_info = self.tmdbchain.tmdb_episodes( + tmdbid=mediainfo.tmdb_id, + season=mediainfo.season + ) + + # 更新任务信息 + task.mediainfo = mediainfo + # 更新队列任务 + curr_task = self.jobview.remove_task(task.fileitem) + self.jobview.add_task(task, state=curr_task['state'] if curr_task else "waiting") + + # 查询整理目标目录 + if not task.target_directory: + if task.src_match: + # 按源目录匹配,以便找到更合适的目录配置 + task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, + storage=task.fileitem.storage, + src_path=Path(task.fileitem.path), + target_storage=task.target_storage) + elif task.target_path: + # 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配 + task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, + dest_path=task.target_path, + target_storage=task.target_storage) + else: + # 未指定目标路径,根据媒体信息获取目标目录 + task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, + storage=task.fileitem.storage, + target_storage=task.target_storage) + # 执行整理 transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem, meta=task.meta, @@ -775,7 +894,21 @@ class TransferChain(ChainBase, metaclass=Singleton): logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" - logger.info(f"正在计划整理 {len(file_items)} 个文件...") + # 总数量 + total_num = len(file_items) + # 已处理数量 + processed_num = 0 + # 失败数量 + fail_num = 0 + logger.info(f"正在计划整理 {total_num} 个文件...") + if not background: + # 启动进度 + self.progress.start(ProgressKey.FileTransfer) + __process_msg = f"开始整理,共 {total_num} 个文件 ..." + logger.info(__process_msg) + self.progress.update(value=0, + text=__process_msg, + key=ProgressKey.FileTransfer) # 整理所有文件 for file_item, bluray_dir in file_items: @@ -801,17 +934,27 @@ class TransferChain(ChainBase, metaclass=Singleton): is_blocked = True break if is_blocked: + fail_num += 1 continue # 整理成功的不再处理 if not force: transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage) - if transferd and transferd.status: + if transferd: all_success = False logger.info(f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。") err_msgs.append(f"{file_item.name} 已整理过") + fail_num += 1 continue + # 更新进度 + if not background: + __process_msg = f"正在整理 ({processed_num + 1}/{total_num}){file_item.name} ..." + logger.info(__process_msg) + self.progress.update(value=processed_num / total_num * 100, + text=__process_msg, + key=ProgressKey.FileTransfer) + if not meta: # 文件元数据 file_meta = MetaInfoPath(file_path) @@ -826,6 +969,7 @@ class TransferChain(ChainBase, metaclass=Singleton): all_success = False logger.error(f"{file_path.name} 无法识别有效信息") err_msgs.append(f"{file_path.name} 无法识别有效信息") + fail_num += 1 continue # 自定义识别 @@ -854,102 +998,22 @@ class TransferChain(ChainBase, metaclass=Singleton): downloader = download_history.downloader download_hash = download_history.download_hash - if not mediainfo: - # 识别媒体信息 - if download_history and (download_history.tmdbid or download_history.doubanid): - # 下载记录中已存在识别信息 - file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type), - tmdbid=download_history.tmdbid, - doubanid=download_history.doubanid) - if file_mediainfo: - # 更新自定义媒体类别 - if download_history.media_category: - file_mediainfo.category = download_history.media_category - else: - # 识别媒体信息 - file_mediainfo = self.mediachain.recognize_by_meta(file_meta) - # 更新媒体图片 - if file_mediainfo: - self.obtain_images(mediainfo=file_mediainfo) - else: - file_mediainfo = mediainfo - - if not file_mediainfo: - all_success = False - logger.warn(f'{file_path.name} 未识别到媒体信息') - err_msgs.append(f"{file_path.name} 未识别到媒体信息") - # 新增整理失败历史记录 - his = self.transferhis.add_fail( - fileitem=file_item, - mode=transfer_type, - meta=file_meta, - downloader=downloader, - download_hash=download_hash - ) - self.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{file_path.name} 未识别到媒体信息,无法入库!", - text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。", - link=settings.MP_DOMAIN('#/history') - )) - continue - - # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title - if not settings.SCRAP_FOLLOW_TMDB: - transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=file_mediainfo.tmdb_id, - mtype=file_mediainfo.type.value) - if transfer_history: - file_mediainfo.title = transfer_history.title - - logger.info(f"{file_path.name} 识别为:{file_mediainfo.type.value} {file_mediainfo.title_year}") - - # 获取集数据 - if file_mediainfo.type == MediaType.TV: - if file_meta.begin_season is None: - file_meta.begin_season = 1 - file_mediainfo.season = file_mediainfo.season or file_meta.begin_season - episodes_info = self.tmdbchain.tmdb_episodes( - tmdbid=file_mediainfo.tmdb_id, - season=file_mediainfo.season - ) - else: - episodes_info = None - - # 查询整理目标目录 - dir_info = None - if not target_directory: - if src_match: - # 按源目录匹配,以便找到更合适的目录配置 - dir_info = self.directoryhelper.get_dir(media=file_mediainfo, - storage=file_item.storage, - src_path=file_path, - target_storage=target_storage) - elif target_path: - # 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配 - dir_info = self.directoryhelper.get_dir(media=file_mediainfo, - dest_path=target_path, - target_storage=target_storage) - else: - # 未指定目标路径,根据媒体信息获取目标目录 - dir_info = self.directoryhelper.get_dir(file_mediainfo, - storage=file_item.storage, - target_storage=target_storage) - # 后台整理 transfer_task = TransferTask( fileitem=file_item, meta=file_meta, - mediainfo=file_mediainfo, - target_directory=target_directory or dir_info, + mediainfo=mediainfo, + target_directory=target_directory, target_storage=target_storage, target_path=target_path, transfer_type=transfer_type, - episodes_info=episodes_info, + src_match=src_match, scrape=scrape, library_type_folder=library_type_folder, library_category_folder=library_category_folder, downloader=downloader, - download_hash=download_hash + download_hash=download_hash, + download_history=download_history ) if background: self.put_to_queue( @@ -965,6 +1029,18 @@ class TransferChain(ChainBase, metaclass=Singleton): all_success = False logger.warn(f"{file_path.name} {err_msg}") err_msgs.append(f"{file_path.name} {err_msg}") + fail_num += 1 + # 完成计数 + processed_num += 1 + + # 整理结束 + if not background: + __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" + logger.info(__end_msg) + self.progress.update(value=100, + text=__end_msg, + key=ProgressKey.FileTransfer) + self.progress.end(ProgressKey.FileTransfer) return all_success, ",".join(err_msgs) diff --git a/app/schemas/history.py b/app/schemas/history.py index da655ca7..c2ec5e87 100644 --- a/app/schemas/history.py +++ b/app/schemas/history.py @@ -46,6 +46,8 @@ class DownloadHistory(BaseModel): date: Optional[str] = None # 备注 note: Optional[Any] = None + # 自定义媒体类别 + media_category: Optional[str] = None class Config: orm_mode = True diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index 975aeb4b..1a4dd980 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -3,7 +3,7 @@ from typing import Optional, List, Any, Callable from pydantic import BaseModel, Field -from app.schemas import TmdbEpisode +from app.schemas import TmdbEpisode, DownloadHistory from app.schemas.file import FileItem from app.schemas.system import TransferDirectoryConf @@ -46,20 +46,21 @@ class TransferTask(BaseModel): """ 文件整理任务 """ - fileitem: Optional[FileItem] = None - file_path: Optional[Path] = None - meta: Optional[Any] = None + fileitem: FileItem = None + meta: Any = None mediainfo: Optional[Any] = None target_directory: Optional[TransferDirectoryConf] = None target_storage: Optional[str] = None target_path: Optional[Path] = None transfer_type: Optional[str] = None - scrape: Optional[bool] = None - library_type_folder: Optional[bool] = None - library_category_folder: Optional[bool] = None + src_match: Optional[bool] = False + scrape: Optional[bool] = False + library_type_folder: Optional[bool] = False + library_category_folder: Optional[bool] = False episodes_info: Optional[List[TmdbEpisode]] = None downloader: Optional[str] = None download_hash: Optional[str] = None + download_history: Optional[DownloadHistory] = None def to_dict(self): """