From 0ba8d51b2a136d1932e4d963005ecd7847a68d6e Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 21 Jan 2026 21:31:55 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BC=98=E5=8C=96=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E5=99=A8=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/transfer.py | 115 ++++++++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 49 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 82181778..40aafcf1 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -35,8 +35,11 @@ from app.utils.singleton import Singleton from app.utils.string import StringUtils from app.utils.system import SystemUtils +# 下载器锁 downloader_lock = threading.Lock() +# 作业锁 job_lock = threading.Lock() +# 任务锁 task_lock = threading.Lock() @@ -405,7 +408,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ self._queue_active = True for i in range(settings.TRANSFER_THREADS): - logger.info(f"启动文件整理线程 {i+1} ...") + logger.info(f"启动文件整理线程 {i + 1} ...") thread = threading.Thread(target=self.__start_transfer, name=f"transfer-{i}", daemon=True) @@ -463,7 +466,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 整理完成后处理 """ - def __do_finished(): + def __all_finished(): """ 完成时发送消息、刮削事件、移除任务等 """ @@ -473,7 +476,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): transferinfo.total_size = self.jobview.size(task.mediainfo, task.meta.begin_season) or task.fileitem.size # 更新文件清单 - transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, []) + with job_lock: + transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, []) + # 发送通知,实时手动整理时不发 if transferinfo.need_notify and (task.background or not task.manual): se_str = None @@ -488,6 +493,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): transferinfo=transferinfo, season_episode=se_str, username=task.username) + # 刮削事件 if transferinfo.need_scrape and self.__is_media_file(task.fileitem): self.eventmanager.send_event(EventType.MetadataScrape, { @@ -497,13 +503,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 'file_list': transferinfo.file_list_new, 'overwrite': False }) + # 移除已完成的任务 - self.jobview.remove_job(task) + with job_lock: + self.jobview.remove_job(task) transferhis = TransferHistoryOper() if not transferinfo.success: # 转移失败 logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}") + # 新增转移失败历史记录 transferhis.add_fail( fileitem=task.fileitem, @@ -514,6 +523,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): mediainfo=task.mediainfo, transferinfo=transferinfo ) + # 整理失败事件 if self.__is_media_file(task.fileitem): self.eventmanager.send_event(EventType.TransferFailed, { @@ -524,6 +534,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 'downloader': task.downloader, 'download_hash': task.download_hash, }) + # 发送失败消息 self.post_message(Notification( mtype=NotificationType.Manual, @@ -533,19 +544,25 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): username=task.username, link=settings.MP_DOMAIN('#/history') )) + # 整理失败 - self.jobview.fail_task(task) - with task_lock: - # 整理完成且有成功的任务时 - if self.jobview.is_finished(task): - __do_finished() + with job_lock: + self.jobview.fail_task(task) + + # 全部整理完成且有成功的任务时 + if self.jobview.is_finished(task): + # 发送消息、刮削事件、移除任务 + __all_finished() + return False, transferinfo.message - # 转移成功 - self.jobview.finish_task(task) + # task转移成功 + with job_lock: + self.jobview.finish_task(task) + logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") - # 新增转移成功历史记录 + # 新增task转移成功历史记录 transferhis.add_success( fileitem=task.fileitem, mode=transferinfo.transfer_type if transferinfo else '', @@ -556,7 +573,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): transferinfo=transferinfo ) - # 整理完成事件 + # task整理完成事件 if self.__is_media_file(task.fileitem): self.eventmanager.send_event(EventType.TransferComplete, { 'fileitem': task.fileitem, @@ -566,34 +583,40 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 'downloader': task.downloader, 'download_hash': task.download_hash, }) + + # 下载器整理任务更新任务标签 + if task.download_hash: + self.transfer_completed(hashs=task.download_hash, downloader=task.downloader) - with task_lock: - # 登记转移成功文件清单 - target_dir_path = transferinfo.target_diritem.path - target_files = transferinfo.file_list_new + # task登记转移成功文件清单 + target_dir_path = transferinfo.target_diritem.path + target_files = transferinfo.file_list_new + with job_lock: if self._success_target_files.get(target_dir_path): self._success_target_files[target_dir_path].extend(target_files) else: self._success_target_files[target_dir_path] = target_files - # 全部整理成功时 - if self.jobview.is_success(task): - # 移动模式删除空目录 - if transferinfo.transfer_type in ["move"]: - # 所有成功的业务 - tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) - storagechain = StorageChain() - # 获取整理屏蔽词 - transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) - for t in tasks: - if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader, - transfer_exclude_words): - if self.remove_torrents(t.download_hash, downloader=t.downloader): - logger.info(f"移动模式删除种子成功:{t.download_hash}") - if t.fileitem: - storagechain.delete_media_file(t.fileitem, delete_self=False) - # 整理完成且有成功的任务时 - if self.jobview.is_finished(task): - __do_finished() + + # 全部整理成功时 + if self.jobview.is_success(task): + # 移动模式删除空目录 + if transferinfo.transfer_type in ["move"]: + # 所有成功的业务 + tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) + # 获取整理屏蔽词 + transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) + for t in tasks: + if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader, + transfer_exclude_words): + if self.remove_torrents(t.download_hash, downloader=t.downloader): + logger.info(f"移动模式删除种子成功:{t.download_hash}") + if t.fileitem: + StorageChain().delete_media_file(t.fileitem, delete_self=False) + + # 全部整理完成且有成功的任务时 + if self.jobview.is_finished(task): + # 发送消息、刮削事件、移除任务 + __all_finished() return True, "" @@ -616,7 +639,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 添加到作业视图 """ - with task_lock: + with job_lock: self.jobview.add_task(task) def remove_from_queue(self, fileitem: FileItem): @@ -625,7 +648,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ if not fileitem: return - self.jobview.remove_task(fileitem) + with job_lock: + self.jobview.remove_task(fileitem) def __start_transfer(self): """ @@ -868,7 +892,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): finally: # 移除已完成的任务 - with task_lock: + with job_lock: if self.jobview.is_done(task): self.jobview.remove_job(task) @@ -955,8 +979,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 非MoviePilot下载的任务,按文件识别 mediainfo = None - # 执行实时整理,匹配源目录 - state, errmsg = self.do_transfer( + # 执行异步整理,匹配源目录 + self.do_transfer( fileitem=FileItem( storage="local", path=file_path.as_posix() + ("/" if file_path.is_dir() else ""), @@ -967,20 +991,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ), mediainfo=mediainfo, downloader=torrent.downloader, - download_hash=torrent.hash, - background=False, + download_hash=torrent.hash ) - # 设置下载任务状态 - if not state: - logger.warn(f"整理下载器任务失败:{torrent.hash} - {errmsg}") - self.transfer_completed(hashs=torrent.hash, downloader=torrent.downloader) finally: torrents.clear() del torrents - # 结束 - logger.info("所有下载器中下载完成的文件已整理完成") return True def __get_trans_fileitems(