From 6aec326d05230b3988dfb3241442126c3d3dce2e Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Tue, 14 Apr 2026 13:01:59 +0800 Subject: [PATCH] fix(transfer): fail stale queue tasks on errors --- app/chain/transfer.py | 37 +++++++++++++++++++++++++++--- tests/test_transfer_job_manager.py | 33 ++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 01550305..9ad765ae 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -268,6 +268,26 @@ class JobManager: - set(task.meta.episode_list) ) + def fail_unfinished_task(self, task: TransferTask): + """ + 将指定任务视图中的非终态任务标记为失败 + """ + if not task or not task.fileitem: + return + with job_lock: + for mediaid, job in self._job_view.items(): + for job_task in job.tasks: + if job_task.fileitem != task.fileitem: + continue + if job_task.state not in ["completed", "failed"]: + job_task.state = "failed" + if mediaid in self._season_episodes: + self._season_episodes[mediaid] = list( + set(self._season_episodes[mediaid]) + - set(task.meta.episode_list) + ) + return + def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]: """ 根据文件项移除任务 @@ -1071,6 +1091,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.error( f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}" ) + self.jobview.fail_unfinished_task(task) + self.jobview.try_remove_job(task) with task_lock: self._processed_num += 1 self._fail_num += 1 @@ -1803,9 +1825,18 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): "finished": finished_files, }, ) - state, err_msg = self.__handle_transfer( - task=transfer_task, callback=self.__default_callback - ) + try: + state, err_msg = self.__handle_transfer( + task=transfer_task, callback=self.__default_callback + ) + except Exception as e: + logger.error( + f"{transfer_task.fileitem.name} 整理任务处理出现错误:" + f"{e} - {traceback.format_exc()}" + ) + self.jobview.fail_unfinished_task(transfer_task) + self.jobview.try_remove_job(transfer_task) + state, err_msg = False, str(e) if not state: all_success = False logger.warn(f"{transfer_task.fileitem.name} {err_msg}") diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index 53049d3f..bf761d3f 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -126,6 +126,39 @@ class TransferJobManagerTest(unittest.TestCase): migrate_to_media_job(jobview, tasks[1]) self.assertEqual([], jobview.list_jobs()) + def test_exception_marks_unfinished_meta_task_failed_and_cleans_jobs(self): + jobview = JobManager() + tasks = [make_task(episode) for episode in range(1, 3)] + for task in tasks: + self.assertTrue(jobview.add_task(task)) + + migrate_to_media_job(jobview, tasks[0]) + jobview.running_task(tasks[1]) + + jobview.fail_unfinished_task(tasks[1]) + jobview.try_remove_job(tasks[1]) + + self.assertEqual([], jobview.list_jobs()) + + def test_exception_marks_unfinished_media_task_failed_and_cleans_jobs(self): + jobview = JobManager() + task = make_task(1) + self.assertTrue(jobview.add_task(task)) + + curr_task = jobview.remove_task(task.fileitem) + task.mediainfo = FakeMedia() + jobview.add_task( + task, + state=curr_task.state if curr_task else "waiting", + link_meta_job=curr_task is not None, + ) + jobview.running_task(task) + + jobview.fail_unfinished_task(task) + jobview.try_remove_job(task) + + self.assertEqual([], jobview.list_jobs()) + def test_pre_recognized_jobs_with_same_meta_do_not_block_each_other(self): jobview = JobManager() task1 = make_task(1)