From 81828948dde2de67deba2a28a5071fac09868b8a Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Tue, 14 Apr 2026 14:17:58 +0800 Subject: [PATCH] fix(transfer): tighten queue cleanup edge cases --- app/chain/transfer.py | 34 ++++++++++++++++++------ tests/test_transfer_job_manager.py | 42 +++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 2f06e1fb..eae07853 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -192,7 +192,7 @@ class JobManager: """ 将任务从 meta 作业迁移到 media 作业 """ - curr_task = self.remove_task(task.fileitem) + curr_task, source_job_id = self.__remove_task_with_job_id(task.fileitem) if not self.add_task(task, state=curr_task.state if curr_task else "waiting"): return False if curr_task and task.mediainfo: @@ -200,7 +200,7 @@ class JobManager: meta=task.meta, season=task.meta.begin_season ) mediaid = self.__get_id(task) - if mediaid != metaid: + if source_job_id == metaid and mediaid != metaid: with job_lock: self._meta_to_media_ids.setdefault(metaid, set()).add(mediaid) return True @@ -297,6 +297,15 @@ class JobManager: """ 根据文件项移除任务 """ + task, _ = self.__remove_task_with_job_id(fileitem) + return task + + def __remove_task_with_job_id( + self, fileitem: FileItem + ) -> Tuple[Optional[TransferJobTask], Optional[Tuple]]: + """ + 根据文件项移除任务,并返回任务所在的作业ID + """ with job_lock: for mediaid in list(self._job_view): job = self._job_view[mediaid] @@ -312,8 +321,8 @@ class JobManager: set(self._season_episodes[mediaid]) - set(task.meta.episode_list) ) - return task - return None + return task, mediaid + return None, None def remove_job(self, task: TransferTask) -> Optional[TransferJob]: """ @@ -1020,6 +1029,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return self.jobview.remove_task(fileitem) + def __fail_transfer_task(self, task: TransferTask): + """ + 标记异常整理任务失败并清理作业视图 + """ + self.jobview.fail_unfinished_task(task) + if task.download_hash and self.jobview.is_torrent_done(task.download_hash): + self.transfer_completed( + hashs=task.download_hash, downloader=task.downloader + ) + self.jobview.try_remove_job(task) + def __start_transfer(self): """ 处理队列 @@ -1096,8 +1116,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.error( f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}" ) - self.jobview.fail_unfinished_task(task) - self.jobview.try_remove_job(task) + self.__fail_transfer_task(task) with task_lock: self._processed_num += 1 self._fail_num += 1 @@ -1832,8 +1851,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): f"{transfer_task.fileitem.name} 整理任务处理出现错误:" f"{e} - {traceback.format_exc()}" ) - self.jobview.fail_unfinished_task(transfer_task) - self.jobview.try_remove_job(transfer_task) + self.__fail_transfer_task(transfer_task) state, err_msg = False, str(e) if not state: all_success = False diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index 25b807e6..55939b25 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -1,6 +1,6 @@ import unittest -from app.chain.transfer import JobManager +from app.chain.transfer import JobManager, TransferChain from app.schemas import FileItem, TransferTask from app.schemas.types import MediaType @@ -167,6 +167,46 @@ class TransferJobManagerTest(unittest.TestCase): self.assertEqual(1, len(jobs)) self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem) + def test_pre_recognized_migrations_with_same_meta_do_not_link_jobs(self): + jobview = JobManager() + task1 = make_task(1) + task2 = make_task(2) + task1.mediainfo = FakeMedia(100) + task2.mediainfo = FakeMedia(200) + + self.assertTrue(jobview.add_task(task1)) + self.assertTrue(jobview.add_task(task2)) + + self.assertTrue(jobview.migrate_task(task1)) + self.assertTrue(jobview.migrate_task(task2)) + jobview.running_task(task1) + jobview.finish_task(task1) + jobview.try_remove_job(task1) + + jobs = jobview.list_jobs() + self.assertEqual(1, len(jobs)) + self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem) + + def test_exception_failure_marks_downloader_hash_completed_before_cleanup(self): + chain = object.__new__(TransferChain) + chain.jobview = JobManager() + completed = [] + + def fake_transfer_completed(hashs, downloader): + completed.append((hashs, downloader)) + + chain.transfer_completed = fake_transfer_completed + task = make_task(1) + task.downloader = "qbittorrent" + task.download_hash = "abc123" + self.assertTrue(chain.jobview.add_task(task)) + chain.jobview.running_task(task) + + chain._TransferChain__fail_transfer_task(task) + + self.assertEqual([("abc123", "qbittorrent")], completed) + self.assertEqual([], chain.jobview.list_jobs()) + if __name__ == "__main__": unittest.main()