diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 40aafcf1..2e24645c 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -151,7 +151,7 @@ class JobManager: def running_task(self, task: TransferTask): """ - 任务运行中 + 设置任务为运行中 """ with job_lock: __mediaid__ = self.__get_id(task) @@ -165,7 +165,7 @@ class JobManager: def finish_task(self, task: TransferTask): """ - 任务完成 + 设置任务为完成 """ with job_lock: __mediaid__ = self.__get_id(task) @@ -179,7 +179,7 @@ class JobManager: def fail_task(self, task: TransferTask): """ - 任务失败 + 设置任务为失败 """ with job_lock: __mediaid__ = self.__get_id(task) @@ -198,7 +198,7 @@ class JobManager: def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]: """ - 移除所有作业中的整理任务 + 移除任务 """ with job_lock: for mediaid in list(self._job_view): @@ -233,7 +233,7 @@ class JobManager: def is_done(self, task: TransferTask) -> bool: """ - 检查某项作业是否整理完成(不管成功还是失败) + 检查任务对应的作业是否整理完成(不管成功还是失败) """ __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) @@ -256,7 +256,7 @@ class JobManager: def is_finished(self, task: TransferTask) -> bool: """ - 检查某项作业是否已完成且有成功的记录 + 检查任务对应的作业是否已完成且有成功的记录 """ __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) @@ -282,7 +282,7 @@ class JobManager: def is_success(self, task: TransferTask) -> bool: """ - 检查某项作业是否全部成功 + 检查任务对应的作业是否全部成功 """ __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) @@ -303,6 +303,18 @@ class JobManager: media_success = True return meta_success and media_success + def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool: + """ + 判断是否有任务正在处理 + """ + if mediainfo: + __mediaid__ = self.__get_media_id(media=meta, season=season) + if __mediaid__ in self._job_view: + return True + + __metaid__ = self.__get_meta_id(meta=meta, season=season) + return __metaid__ in self._job_view + def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: """ 获取某项任务成功的任务 @@ -313,6 +325,16 @@ class JobManager: return [] return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"] + def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: + """ + 获取全部任务 + """ + __mediaid__ = self.__get_media_id(media=media, season=season) + with job_lock: + if __mediaid__ not in self._job_view: + return [] + return self._job_view[__mediaid__].tasks + def count(self, media: MediaInfo, season: Optional[int] = None) -> int: """ 获取某项任务成功总数 @@ -478,7 +500,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 更新文件清单 with job_lock: transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, []) - + # 发送通知,实时手动整理时不发 if transferinfo.need_notify and (task.background or not task.manual): se_str = None @@ -493,7 +515,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): transferinfo=transferinfo, season_episode=se_str, username=task.username) - + # 刮削事件 if transferinfo.need_scrape and self.__is_media_file(task.fileitem): self.eventmanager.send_event(EventType.MetadataScrape, { @@ -503,7 +525,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 'file_list': transferinfo.file_list_new, 'overwrite': False }) - + # 移除已完成的任务 with job_lock: self.jobview.remove_job(task) @@ -544,22 +566,22 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): username=task.username, link=settings.MP_DOMAIN('#/history') )) - + # 整理失败 with job_lock: self.jobview.fail_task(task) - + # 全部整理完成且有成功的任务时 if self.jobview.is_finished(task): # 发送消息、刮削事件、移除任务 __all_finished() - + return False, transferinfo.message # task转移成功 with job_lock: self.jobview.finish_task(task) - + logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") # 新增task转移成功历史记录 @@ -583,10 +605,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 'downloader': task.downloader, 'download_hash': task.download_hash, }) - - # 下载器整理任务更新任务标签 - if task.download_hash: - self.transfer_completed(hashs=task.download_hash, downloader=task.downloader) # task登记转移成功文件清单 target_dir_path = transferinfo.target_diritem.path @@ -618,6 +636,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 发送消息、刮削事件、移除任务 __all_finished() + # 全部整理完成不管成功还是失败 + if self.jobview.is_done(task): + # 所有任务 + tasks = self.jobview.all_tasks() + for t in tasks: + if t.download_hash: + # 设置种子状态为已整理 + self.transfer_completed(hashs=t.download_hash, downloader=t.downloader) + return True, "" def put_to_queue(self, task: TransferTask): @@ -915,15 +942,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 获取下载器中的种子列表,并执行整理 """ - - # 全局锁,避免重复处理 + # 全局锁,避免定时服务重复 with downloader_lock: # 获取下载器监控目录 download_dirs = DirectoryHelper().get_download_dirs() + # 如果没有下载器监控的目录则不处理 if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local" for dir_info in download_dirs): return True + logger.info("开始整理下载器中已经完成下载的文件 ...") # 从下载器获取种子列表 @@ -938,11 +966,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): for torrent in torrents: if global_vars.is_system_stopped: break + # 文件路径 file_path = torrent.path if not file_path.exists(): logger.warn(f"文件不存在:{file_path}") continue + # 检查是否为下载器监控目录中的文件 is_downloader_monitor = False for dir_info in download_dirs: @@ -956,15 +986,25 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not is_downloader_monitor: logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理") continue + # 查询下载记录识别情况 downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash(torrent.hash) if downloadhis: + # 获取自定义识别词 + custom_words_list = None + if downloadhis.custom_words: + custom_words_list = downloadhis.custom_words.split('\n') + # 类型 try: mtype = MediaType(downloadhis.type) except ValueError: mtype = MediaType.TV - # 按TMDBID识别 + + # 识别元数据 + metainfo = MetaInfoPath(file_path, custom_words=custom_words_list) + + # 识别媒体信息 mediainfo = self.recognize_media(mtype=mtype, tmdbid=downloadhis.tmdbid, doubanid=downloadhis.doubanid, @@ -975,10 +1015,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 更新自定义媒体类别 if downloadhis.media_category: mediainfo.category = downloadhis.media_category + else: # 非MoviePilot下载的任务,按文件识别 + metainfo = MetaInfoPath(file_path) mediainfo = None + # 检查是否已经有任务处理中,如有则跳过本次整理 + if self.jobview.has_tasks(meta=metainfo, mediainfo=mediainfo): + logger.info(f"有任务正在整理中,跳过本次整理 ...") + return False + # 执行异步整理,匹配源目录 self.do_transfer( fileitem=FileItem(