diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 3c23ac9b..b5781ee0 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1075,10 +1075,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): __notify() # 只要该种子的所有任务都已整理完成,则设置种子状态为已整理 - if task.download_hash and self.jobview.is_torrent_done(task.download_hash): - self.transfer_completed( - hashs=task.download_hash, downloader=task.downloader - ) + self.__mark_torrent_completed_if_done(task.download_hash, task.downloader) # 移动模式,全部成功时删除空目录和种子文件 if transferinfo.transfer_type in ["move"]: @@ -1136,6 +1133,22 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ return self.jobview.add_task(task) + def __mark_torrent_completed_if_done( + self, + download_hash: Optional[str], + downloader: Optional[str], + history_exists: bool = True, + ): + """ + 当同一种子的任务都已结束时,回写下载器已整理标签。 + """ + if ( + history_exists + and download_hash + and self.jobview.is_torrent_done(download_hash) + ): + self.transfer_completed(hashs=download_hash, downloader=downloader) + def remove_from_queue(self, fileitem: FileItem): """ 从待整理队列移除 @@ -1149,10 +1162,6 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 标记异常整理任务失败并清理作业视图 """ self.jobview.fail_unfinished_task(task) - if task.download_hash and self.jobview.is_torrent_done(task.download_hash): - self.transfer_completed( - hashs=task.download_hash, downloader=task.downloader - ) self.jobview.try_remove_job(task) def __start_transfer(self): @@ -1328,6 +1337,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) # 任务失败,直接移除task self.jobview.remove_task(task.fileitem) + self.__mark_torrent_completed_if_done( + task.download_hash, task.downloader + ) # AI智能体自动重试整理 if ( @@ -1953,10 +1965,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") return False, f"{fileitem.name} 没有找到可整理的媒体文件" - logger.info(f"正在计划整理 {len(file_items)} 个文件...") + planned_file_count = len(file_items) + logger.info(f"正在计划整理 {planned_file_count} 个文件...") # 整理所有文件 transfer_tasks: List[TransferTask] = [] + skipped_history_count = 0 + skipped_torrents = set() try: for file_item, bluray_dir in file_items: if global_vars.is_system_stopped: @@ -1971,8 +1986,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): file_item.path, storage=file_item.storage ) if transferd: + skipped_history_count += 1 if not transferd.status: all_success = False + candidate_hash = download_hash or transferd.download_hash + candidate_downloader = downloader or transferd.downloader + if candidate_hash and candidate_downloader: + skipped_torrents.add( + (candidate_hash, candidate_downloader) + ) logger.info( f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。" ) @@ -2139,6 +2161,18 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): progress.update(value=100, text=__end_msg, data={}) progress.end() + # 下载器任务在这一轮可能因为历史记录全部命中而没有进入整理队列, + # 这里补打一遍已整理标签,避免同一种子被重复扫描。 + if ( + skipped_history_count == planned_file_count + and skipped_torrents + ): + for skipped_hash, skipped_downloader in skipped_torrents: + logger.info(f"补充设置下载任务已整理标签:{skipped_hash}") + self.__mark_torrent_completed_if_done( + skipped_hash, skipped_downloader + ) + error_msg = "、".join(err_msgs[:2]) + ( f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "" ) diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index 55939b25..8b8553d7 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -1,4 +1,6 @@ import unittest +from types import SimpleNamespace +from unittest.mock import patch from app.chain.transfer import JobManager, TransferChain from app.schemas import FileItem, TransferTask @@ -187,7 +189,7 @@ class TransferJobManagerTest(unittest.TestCase): self.assertEqual(1, len(jobs)) self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem) - def test_exception_failure_marks_downloader_hash_completed_before_cleanup(self): + def test_exception_failure_does_not_mark_downloader_without_history(self): chain = object.__new__(TransferChain) chain.jobview = JobManager() completed = [] @@ -204,6 +206,129 @@ class TransferJobManagerTest(unittest.TestCase): chain._TransferChain__fail_transfer_task(task) + self.assertEqual([], completed) + self.assertEqual([], chain.jobview.list_jobs()) + + def test_successful_history_skip_marks_downloader_hash_completed(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + completed = [] + + def fake_transfer_completed(hashs, downloader): + completed.append((hashs, downloader)) + + chain.transfer_completed = fake_transfer_completed + chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [ + (fileitem, False) + ] + + fileitem = make_task(1).fileitem + history = SimpleNamespace( + status=True, + download_hash="abc123", + downloader="qbittorrent", + ) + transfer_history_oper = SimpleNamespace( + get_by_src=lambda src, storage=None: history + ) + system_config_oper = SimpleNamespace(get=lambda key: None) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=transfer_history_oper, + ), patch( + "app.chain.transfer.SystemConfigOper", + return_value=system_config_oper, + ): + state, errmsg = TransferChain.do_transfer( + chain, + fileitem=fileitem, + downloader="qbittorrent", + download_hash="abc123", + background=False, + ) + + self.assertTrue(state) + self.assertEqual("Test.Show.S01E01.mkv 已整理过", errmsg) + self.assertEqual([("abc123", "qbittorrent")], completed) + + def test_failed_history_skip_still_marks_downloader_hash_completed(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + completed = [] + + def fake_transfer_completed(hashs, downloader): + completed.append((hashs, downloader)) + + chain.transfer_completed = fake_transfer_completed + chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [ + (fileitem, False) + ] + + fileitem = make_task(1).fileitem + history = SimpleNamespace( + status=False, + download_hash="abc123", + downloader="qbittorrent", + ) + transfer_history_oper = SimpleNamespace( + get_by_src=lambda src, storage=None: history + ) + system_config_oper = SimpleNamespace(get=lambda key: None) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=transfer_history_oper, + ), patch( + "app.chain.transfer.SystemConfigOper", + return_value=system_config_oper, + ): + state, errmsg = TransferChain.do_transfer( + chain, + fileitem=fileitem, + downloader="qbittorrent", + download_hash="abc123", + background=False, + ) + + self.assertFalse(state) + self.assertEqual("Test.Show.S01E01.mkv 已整理过", errmsg) + self.assertEqual([("abc123", "qbittorrent")], completed) + + def test_unrecognized_task_marks_downloader_hash_completed(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + chain.post_message = lambda *_args, **_kwargs: None + completed = [] + + def fake_transfer_completed(hashs, downloader): + completed.append((hashs, downloader)) + + chain.transfer_completed = fake_transfer_completed + task = make_task(1) + task.downloader = "qbittorrent" + task.download_hash = "abc123" + self.assertTrue(chain.jobview.add_task(task)) + + transfer_history_oper = SimpleNamespace( + add_fail=lambda **kwargs: SimpleNamespace(id=1) + ) + + with patch( + "app.chain.transfer.TransferHistoryOper", + return_value=transfer_history_oper, + ), patch( + "app.chain.transfer.MediaChain" + ) as media_chain_cls, patch( + "app.chain.transfer.settings.AI_AGENT_ENABLE", False + ), patch( + "app.chain.transfer.settings.AI_AGENT_RETRY_TRANSFER", False + ): + media_chain_cls.return_value.recognize_by_meta.return_value = None + state, errmsg = chain._TransferChain__handle_transfer(task) + + self.assertFalse(state) + self.assertEqual("未识别到媒体信息", errmsg) self.assertEqual([("abc123", "qbittorrent")], completed) self.assertEqual([], chain.jobview.list_jobs())