fix(transfer): clean migrated queue jobs

This commit is contained in:
InfinityPacer
2026-04-14 13:00:51 +08:00
committed by jxxghp
parent 1688063450
commit d36dd69ec3
2 changed files with 201 additions and 20 deletions

View File

@@ -74,10 +74,13 @@ class JobManager:
_job_view: Dict[Tuple, TransferJob] = {}
# 汇总季集清单
_season_episodes: Dict[Tuple, List[int]] = {}
# 记录从 meta 作业迁移到 media 作业的关系,用于清理提前失败后残留的 media 作业
_meta_to_media_ids: Dict[Tuple, set[Tuple]] = {}
def __init__(self):
self._job_view = {}
self._season_episodes = {}
self._meta_to_media_ids = {}
@staticmethod
def __get_meta_id(meta: MetaBase = None, season: Optional[int] = None) -> Tuple:
@@ -133,7 +136,12 @@ class JobManager:
"""
return schemas.MetaInfo(**task.meta.to_dict())
def add_task(self, task: TransferTask, state: Optional[str] = "waiting") -> bool:
def add_task(
self,
task: TransferTask,
state: Optional[str] = "waiting",
link_meta_job: Optional[bool] = False,
) -> bool:
"""
添加整理任务,自动分组到对应的作业中
:return: True表示任务已添加False表示任务无效或已存在重复
@@ -142,6 +150,9 @@ class JobManager:
return False
with job_lock:
__mediaid__ = self.__get_id(task)
__metaid__ = self.__get_meta_id(
meta=task.meta, season=task.meta.begin_season
)
if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = TransferJob(
media=self.__get_media(task),
@@ -175,6 +186,10 @@ class JobManager:
state=state,
)
)
if link_meta_job and task.mediainfo and __mediaid__ != __metaid__:
self._meta_to_media_ids.setdefault(__metaid__, set()).add(
__mediaid__
)
# 添加季集信息
if self._season_episodes.get(__mediaid__):
self._season_episodes[__mediaid__].extend(task.meta.episode_list)
@@ -185,6 +200,26 @@ class JobManager:
self._season_episodes[__mediaid__] = task.meta.episode_list
return True
def __is_job_done(self, job_id: Tuple) -> bool:
"""
检查指定作业是否已完成
"""
if job_id not in self._job_view:
return True
return all(
task.state in ["completed", "failed"]
for task in self._job_view[job_id].tasks
)
def __pop_job(self, job_id: Tuple):
"""
移除指定作业和对应季集缓存
"""
if job_id in self._season_episodes:
self._season_episodes.pop(job_id)
if job_id in self._job_view:
self._job_view.pop(job_id)
def running_task(self, task: TransferTask):
"""
设置任务为运行中
@@ -280,27 +315,20 @@ class JobManager:
media=task.mediainfo, season=task.meta.begin_season
)
meta_done = True
if __metaid__ in self._job_view:
meta_done = all(
t.state in ["completed", "failed"]
for t in self._job_view[__metaid__].tasks
)
related_media_ids = set(self._meta_to_media_ids.get(__metaid__, set()))
if task.mediainfo:
related_media_ids.add(__mediaid__)
media_done = True
if __mediaid__ in self._job_view:
media_done = all(
t.state in ["completed", "failed"]
for t in self._job_view[__mediaid__].tasks
)
meta_done = self.__is_job_done(__metaid__)
media_done = all(
self.__is_job_done(mediaid) for mediaid in related_media_ids
)
if meta_done and media_done:
__id__ = self.__get_id(task)
if __id__ in self._job_view:
# 移除季集信息
if __id__ in self._season_episodes:
self._season_episodes.pop(__id__)
self._job_view.pop(__id__)
remove_ids = {__metaid__, self.__get_id(task), *related_media_ids}
for job_id in remove_ids:
self.__pop_job(job_id)
self._meta_to_media_ids.pop(__metaid__, None)
def is_done(self, task: TransferTask) -> bool:
"""
@@ -1080,6 +1108,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transferhis = TransferHistoryOper()
mediainfo = task.mediainfo
mediainfo_changed = False
link_meta_job = False
if not mediainfo:
download_history = task.download_history
# 下载用户
@@ -1156,6 +1185,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return False, "未识别到媒体信息"
mediainfo_changed = True
link_meta_job = True
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
@@ -1172,7 +1202,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 更新队列任务
curr_task = self.jobview.remove_task(task.fileitem)
self.jobview.add_task(
task, state=curr_task.state if curr_task else "waiting"
task,
state=curr_task.state if curr_task else "waiting",
link_meta_job=link_meta_job and curr_task is not None,
)
# 获取集数据

View File

@@ -0,0 +1,149 @@
import unittest
from app.chain.transfer import JobManager
from app.schemas import FileItem, TransferTask
from app.schemas.types import MediaType
class FakeMeta:
def __init__(self, episode: int):
self.name = "Test Show"
self.title = f"Test Show S01E{episode:02d}"
self.year = "2026"
self.type = MediaType.TV
self.begin_season = 1
self.end_season = None
self.total_season = 1
self.begin_episode = episode
self.end_episode = None
self.total_episode = 1
self.episode_list = [episode]
self.season_episode = f"S01E{episode:02d}"
self.part = None
@property
def season(self):
return "S01"
@property
def episode(self):
return f"E{self.begin_episode:02d}"
def to_dict(self):
return {
"title": self.title,
"name": self.name,
"year": self.year,
"type": self.type.value,
"begin_season": self.begin_season,
"end_season": self.end_season,
"total_season": self.total_season,
"begin_episode": self.begin_episode,
"end_episode": self.end_episode,
"total_episode": self.total_episode,
"season_episode": self.season_episode,
"episode_list": self.episode_list,
"part": self.part,
}
class FakeMedia:
def __init__(self, tmdb_id: int = 12345):
self.tmdb_id = tmdb_id
self.douban_id = None
def clear(self):
pass
def to_dict(self):
return {
"type": MediaType.TV.value,
"title": "Test Show",
"year": "2026",
"title_year": "Test Show (2026)",
"tmdb_id": self.tmdb_id,
"douban_id": self.douban_id,
}
def make_task(episode: int) -> TransferTask:
name = f"Test.Show.S01E{episode:02d}.mkv"
return TransferTask(
fileitem=FileItem(
storage="local",
path=f"/downloads/Test Show/{name}",
type="file",
name=name,
basename=name.removesuffix(".mkv"),
extension="mkv",
size=1024,
),
meta=FakeMeta(episode),
)
def migrate_to_media_job(jobview: JobManager, task: TransferTask):
curr_task = jobview.remove_task(task.fileitem)
task.mediainfo = FakeMedia()
jobview.add_task(
task,
state=curr_task.state if curr_task else "waiting",
link_meta_job=curr_task is not None,
)
jobview.running_task(task)
jobview.finish_task(task)
jobview.try_remove_job(task)
class TransferJobManagerTest(unittest.TestCase):
def test_completed_media_job_is_removed_after_last_meta_task_fails(self):
jobview = JobManager()
tasks = [make_task(episode) for episode in range(1, 4)]
for task in tasks:
self.assertTrue(jobview.add_task(task))
migrate_to_media_job(jobview, tasks[0])
migrate_to_media_job(jobview, tasks[1])
# 还有一个 meta 任务未处理时media 组虽然已完成也不能提前清理。
self.assertEqual(2, len(jobview.list_jobs()))
# 最后一个仍在 meta 组中的任务未识别__handle_transfer 会直接 remove_task 后 return。
jobview.remove_task(tasks[2].fileitem)
jobview.try_remove_job(tasks[2])
self.assertEqual([], jobview.list_jobs())
def test_completed_media_job_is_removed_after_all_meta_tasks_migrate(self):
jobview = JobManager()
tasks = [make_task(episode) for episode in range(1, 3)]
for task in tasks:
self.assertTrue(jobview.add_task(task))
migrate_to_media_job(jobview, tasks[0])
self.assertEqual(2, len(jobview.list_jobs()))
migrate_to_media_job(jobview, tasks[1])
self.assertEqual([], jobview.list_jobs())
def test_pre_recognized_jobs_with_same_meta_do_not_block_each_other(self):
jobview = JobManager()
task1 = make_task(1)
task2 = make_task(2)
task1.mediainfo = FakeMedia(100)
task2.mediainfo = FakeMedia(200)
self.assertTrue(jobview.add_task(task1))
self.assertTrue(jobview.add_task(task2))
jobview.running_task(task1)
jobview.finish_task(task1)
jobview.try_remove_job(task1)
jobs = jobview.list_jobs()
self.assertEqual(1, len(jobs))
self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem)
if __name__ == "__main__":
unittest.main()