From d36dd69ec330560f55dec3ad1dd79839d088cd7e Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Tue, 14 Apr 2026 13:00:51 +0800 Subject: [PATCH] fix(transfer): clean migrated queue jobs --- app/chain/transfer.py | 72 ++++++++++---- tests/test_transfer_job_manager.py | 149 +++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 20 deletions(-) create mode 100644 tests/test_transfer_job_manager.py diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 5a46d3d1..01550305 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -74,10 +74,13 @@ class JobManager: _job_view: Dict[Tuple, TransferJob] = {} # 汇总季集清单 _season_episodes: Dict[Tuple, List[int]] = {} + # 记录从 meta 作业迁移到 media 作业的关系,用于清理提前失败后残留的 media 作业 + _meta_to_media_ids: Dict[Tuple, set[Tuple]] = {} def __init__(self): self._job_view = {} self._season_episodes = {} + self._meta_to_media_ids = {} @staticmethod def __get_meta_id(meta: MetaBase = None, season: Optional[int] = None) -> Tuple: @@ -133,7 +136,12 @@ class JobManager: """ return schemas.MetaInfo(**task.meta.to_dict()) - def add_task(self, task: TransferTask, state: Optional[str] = "waiting") -> bool: + def add_task( + self, + task: TransferTask, + state: Optional[str] = "waiting", + link_meta_job: Optional[bool] = False, + ) -> bool: """ 添加整理任务,自动分组到对应的作业中 :return: True表示任务已添加,False表示任务无效或已存在(重复) @@ -142,6 +150,9 @@ class JobManager: return False with job_lock: __mediaid__ = self.__get_id(task) + __metaid__ = self.__get_meta_id( + meta=task.meta, season=task.meta.begin_season + ) if __mediaid__ not in self._job_view: self._job_view[__mediaid__] = TransferJob( media=self.__get_media(task), @@ -175,6 +186,10 @@ class JobManager: state=state, ) ) + if link_meta_job and task.mediainfo and __mediaid__ != __metaid__: + self._meta_to_media_ids.setdefault(__metaid__, set()).add( + __mediaid__ + ) # 添加季集信息 if self._season_episodes.get(__mediaid__): self._season_episodes[__mediaid__].extend(task.meta.episode_list) @@ -185,6 +200,26 @@ class JobManager: self._season_episodes[__mediaid__] = task.meta.episode_list return True + def __is_job_done(self, job_id: Tuple) -> bool: + """ + 检查指定作业是否已完成 + """ + if job_id not in self._job_view: + return True + return all( + task.state in ["completed", "failed"] + for task in self._job_view[job_id].tasks + ) + + def __pop_job(self, job_id: Tuple): + """ + 移除指定作业和对应季集缓存 + """ + if job_id in self._season_episodes: + self._season_episodes.pop(job_id) + if job_id in self._job_view: + self._job_view.pop(job_id) + def running_task(self, task: TransferTask): """ 设置任务为运行中 @@ -280,27 +315,20 @@ class JobManager: media=task.mediainfo, season=task.meta.begin_season ) - meta_done = True - if __metaid__ in self._job_view: - meta_done = all( - t.state in ["completed", "failed"] - for t in self._job_view[__metaid__].tasks - ) + related_media_ids = set(self._meta_to_media_ids.get(__metaid__, set())) + if task.mediainfo: + related_media_ids.add(__mediaid__) - media_done = True - if __mediaid__ in self._job_view: - media_done = all( - t.state in ["completed", "failed"] - for t in self._job_view[__mediaid__].tasks - ) + meta_done = self.__is_job_done(__metaid__) + media_done = all( + self.__is_job_done(mediaid) for mediaid in related_media_ids + ) if meta_done and media_done: - __id__ = self.__get_id(task) - if __id__ in self._job_view: - # 移除季集信息 - if __id__ in self._season_episodes: - self._season_episodes.pop(__id__) - self._job_view.pop(__id__) + remove_ids = {__metaid__, self.__get_id(task), *related_media_ids} + for job_id in remove_ids: + self.__pop_job(job_id) + self._meta_to_media_ids.pop(__metaid__, None) def is_done(self, task: TransferTask) -> bool: """ @@ -1080,6 +1108,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): transferhis = TransferHistoryOper() mediainfo = task.mediainfo mediainfo_changed = False + link_meta_job = False if not mediainfo: download_history = task.download_history # 下载用户 @@ -1156,6 +1185,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return False, "未识别到媒体信息" mediainfo_changed = True + link_meta_job = True # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title if not settings.SCRAP_FOLLOW_TMDB: @@ -1172,7 +1202,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 更新队列任务 curr_task = self.jobview.remove_task(task.fileitem) self.jobview.add_task( - task, state=curr_task.state if curr_task else "waiting" + task, + state=curr_task.state if curr_task else "waiting", + link_meta_job=link_meta_job and curr_task is not None, ) # 获取集数据 diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py new file mode 100644 index 00000000..53049d3f --- /dev/null +++ b/tests/test_transfer_job_manager.py @@ -0,0 +1,149 @@ +import unittest + +from app.chain.transfer import JobManager +from app.schemas import FileItem, TransferTask +from app.schemas.types import MediaType + + +class FakeMeta: + def __init__(self, episode: int): + self.name = "Test Show" + self.title = f"Test Show S01E{episode:02d}" + self.year = "2026" + self.type = MediaType.TV + self.begin_season = 1 + self.end_season = None + self.total_season = 1 + self.begin_episode = episode + self.end_episode = None + self.total_episode = 1 + self.episode_list = [episode] + self.season_episode = f"S01E{episode:02d}" + self.part = None + + @property + def season(self): + return "S01" + + @property + def episode(self): + return f"E{self.begin_episode:02d}" + + def to_dict(self): + return { + "title": self.title, + "name": self.name, + "year": self.year, + "type": self.type.value, + "begin_season": self.begin_season, + "end_season": self.end_season, + "total_season": self.total_season, + "begin_episode": self.begin_episode, + "end_episode": self.end_episode, + "total_episode": self.total_episode, + "season_episode": self.season_episode, + "episode_list": self.episode_list, + "part": self.part, + } + + +class FakeMedia: + def __init__(self, tmdb_id: int = 12345): + self.tmdb_id = tmdb_id + self.douban_id = None + + def clear(self): + pass + + def to_dict(self): + return { + "type": MediaType.TV.value, + "title": "Test Show", + "year": "2026", + "title_year": "Test Show (2026)", + "tmdb_id": self.tmdb_id, + "douban_id": self.douban_id, + } + + +def make_task(episode: int) -> TransferTask: + name = f"Test.Show.S01E{episode:02d}.mkv" + return TransferTask( + fileitem=FileItem( + storage="local", + path=f"/downloads/Test Show/{name}", + type="file", + name=name, + basename=name.removesuffix(".mkv"), + extension="mkv", + size=1024, + ), + meta=FakeMeta(episode), + ) + + +def migrate_to_media_job(jobview: JobManager, task: TransferTask): + curr_task = jobview.remove_task(task.fileitem) + task.mediainfo = FakeMedia() + jobview.add_task( + task, + state=curr_task.state if curr_task else "waiting", + link_meta_job=curr_task is not None, + ) + jobview.running_task(task) + jobview.finish_task(task) + jobview.try_remove_job(task) + + +class TransferJobManagerTest(unittest.TestCase): + def test_completed_media_job_is_removed_after_last_meta_task_fails(self): + jobview = JobManager() + tasks = [make_task(episode) for episode in range(1, 4)] + for task in tasks: + self.assertTrue(jobview.add_task(task)) + + migrate_to_media_job(jobview, tasks[0]) + migrate_to_media_job(jobview, tasks[1]) + + # 还有一个 meta 任务未处理时,media 组虽然已完成也不能提前清理。 + self.assertEqual(2, len(jobview.list_jobs())) + + # 最后一个仍在 meta 组中的任务未识别,__handle_transfer 会直接 remove_task 后 return。 + jobview.remove_task(tasks[2].fileitem) + jobview.try_remove_job(tasks[2]) + + self.assertEqual([], jobview.list_jobs()) + + def test_completed_media_job_is_removed_after_all_meta_tasks_migrate(self): + jobview = JobManager() + tasks = [make_task(episode) for episode in range(1, 3)] + for task in tasks: + self.assertTrue(jobview.add_task(task)) + + migrate_to_media_job(jobview, tasks[0]) + self.assertEqual(2, len(jobview.list_jobs())) + + migrate_to_media_job(jobview, tasks[1]) + self.assertEqual([], jobview.list_jobs()) + + def test_pre_recognized_jobs_with_same_meta_do_not_block_each_other(self): + jobview = JobManager() + task1 = make_task(1) + task2 = make_task(2) + task1.mediainfo = FakeMedia(100) + task2.mediainfo = FakeMedia(200) + + self.assertTrue(jobview.add_task(task1)) + self.assertTrue(jobview.add_task(task2)) + + jobview.running_task(task1) + jobview.finish_task(task1) + jobview.try_remove_job(task1) + + jobs = jobview.list_jobs() + self.assertEqual(1, len(jobs)) + self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem) + + +if __name__ == "__main__": + unittest.main()