fix(transfer): fail stale queue tasks on errors

This commit is contained in:
InfinityPacer
2026-04-14 13:01:59 +08:00
committed by jxxghp
parent d36dd69ec3
commit 6aec326d05
2 changed files with 67 additions and 3 deletions

View File

@@ -268,6 +268,26 @@ class JobManager:
- set(task.meta.episode_list)
)
def fail_unfinished_task(self, task: TransferTask):
"""
将指定任务视图中的非终态任务标记为失败
"""
if not task or not task.fileitem:
return
with job_lock:
for mediaid, job in self._job_view.items():
for job_task in job.tasks:
if job_task.fileitem != task.fileitem:
continue
if job_task.state not in ["completed", "failed"]:
job_task.state = "failed"
if mediaid in self._season_episodes:
self._season_episodes[mediaid] = list(
set(self._season_episodes[mediaid])
- set(task.meta.episode_list)
)
return
def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]:
"""
根据文件项移除任务
@@ -1071,6 +1091,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.error(
f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}"
)
self.jobview.fail_unfinished_task(task)
self.jobview.try_remove_job(task)
with task_lock:
self._processed_num += 1
self._fail_num += 1
@@ -1803,9 +1825,18 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"finished": finished_files,
},
)
state, err_msg = self.__handle_transfer(
task=transfer_task, callback=self.__default_callback
)
try:
state, err_msg = self.__handle_transfer(
task=transfer_task, callback=self.__default_callback
)
except Exception as e:
logger.error(
f"{transfer_task.fileitem.name} 整理任务处理出现错误:"
f"{e} - {traceback.format_exc()}"
)
self.jobview.fail_unfinished_task(transfer_task)
self.jobview.try_remove_job(transfer_task)
state, err_msg = False, str(e)
if not state:
all_success = False
logger.warn(f"{transfer_task.fileitem.name} {err_msg}")

View File

@@ -126,6 +126,39 @@ class TransferJobManagerTest(unittest.TestCase):
migrate_to_media_job(jobview, tasks[1])
self.assertEqual([], jobview.list_jobs())
def test_exception_marks_unfinished_meta_task_failed_and_cleans_jobs(self):
jobview = JobManager()
tasks = [make_task(episode) for episode in range(1, 3)]
for task in tasks:
self.assertTrue(jobview.add_task(task))
migrate_to_media_job(jobview, tasks[0])
jobview.running_task(tasks[1])
jobview.fail_unfinished_task(tasks[1])
jobview.try_remove_job(tasks[1])
self.assertEqual([], jobview.list_jobs())
def test_exception_marks_unfinished_media_task_failed_and_cleans_jobs(self):
jobview = JobManager()
task = make_task(1)
self.assertTrue(jobview.add_task(task))
curr_task = jobview.remove_task(task.fileitem)
task.mediainfo = FakeMedia()
jobview.add_task(
task,
state=curr_task.state if curr_task else "waiting",
link_meta_job=curr_task is not None,
)
jobview.running_task(task)
jobview.fail_unfinished_task(task)
jobview.try_remove_job(task)
self.assertEqual([], jobview.list_jobs())
def test_pre_recognized_jobs_with_same_meta_do_not_block_each_other(self):
jobview = JobManager()
task1 = make_task(1)