feat(transfer): enhance job removal methods for thread safety and strict checks

This commit is contained in:
jxxghp
2026-02-01 16:58:32 +08:00
parent eda306d726
commit 7b8cd37a9b

View File

@@ -224,7 +224,7 @@ class JobManager:
def remove_job(self, task: TransferTask) -> Optional[TransferJob]:
"""
移除任务对应的作业
移除任务对应的作业(强制,线程不安全)
"""
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -235,68 +235,99 @@ class JobManager:
return self._job_view.pop(__mediaid__)
return None
def try_remove_job(self, task: TransferTask):
"""
尝试移除任务对应的作业(严格检查未完成作业,线程安全)
"""
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
meta_done = True
if __metaid__ in self._job_view:
meta_done = all(
t.state in ["completed", "failed"] for t in self._job_view[__metaid__].tasks
)
media_done = True
if __mediaid__ in self._job_view:
media_done = all(
t.state in ["completed", "failed"] for t in self._job_view[__mediaid__].tasks
)
if meta_done and media_done:
__id__ = self.__get_id(task)
if __id__ in self._job_view:
# 移除季集信息
if __id__ in self._season_episodes:
self._season_episodes.pop(__id__)
self._job_view.pop(__id__)
def is_done(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否整理完成(不管成功还是失败)
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_done = True
if __mediaid__ in self._job_view:
media_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_done = True
return meta_done and media_done
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_done = True
if __mediaid__ in self._job_view:
media_done = all(
task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_done = True
return meta_done and media_done
def is_finished(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否已完成且有成功的记录
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_finished = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_finished = True
if __mediaid__ in self._job_view:
tasks = self._job_view[__mediaid__].tasks
media_finished = all(
task.state in ["completed", "failed"] for task in tasks
) and any(
task.state == "completed" for task in tasks
)
else:
media_finished = True
return meta_finished and media_finished
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_finished = all(
task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_finished = True
if __mediaid__ in self._job_view:
tasks = self._job_view[__mediaid__].tasks
media_finished = all(
task.state in ["completed", "failed"] for task in tasks
) and any(
task.state == "completed" for task in tasks
)
else:
media_finished = True
return meta_finished and media_finished
def is_success(self, task: TransferTask) -> bool:
"""
检查任务对应的作业是否全部成功
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_success = all(
task.state in ["completed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_success = True
if __mediaid__ in self._job_view:
media_success = all(
task.state in ["completed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_success = True
return meta_success and media_success
with job_lock:
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __metaid__ in self._job_view:
meta_success = all(
task.state in ["completed"] for task in self._job_view[__metaid__].tasks
)
else:
meta_success = True
if __mediaid__ in self._job_view:
media_success = all(
task.state in ["completed"] for task in self._job_view[__mediaid__].tasks
)
else:
media_success = True
return meta_success and media_success
def get_all_torrent_hashes(self) -> set[str]:
"""
@@ -337,74 +368,82 @@ class JobManager:
"""
判断作业是否还有任务正在处理
"""
if mediainfo:
__mediaid__ = self.__get_media_id(media=mediainfo, season=season)
if __mediaid__ in self._job_view:
return True
with job_lock:
if mediainfo:
__mediaid__ = self.__get_media_id(media=mediainfo, season=season)
if __mediaid__ in self._job_view:
return True
__metaid__ = self.__get_meta_id(meta=meta, season=season)
return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0
__metaid__ = self.__get_meta_id(meta=meta, season=season)
return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0
def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取作业中所有成功的任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取作业中全部任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return self._job_view[__mediaid__].tasks
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return self._job_view[__mediaid__].tasks
def count(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取作业中成功总数
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
def size(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取作业中所有成功文件总大小
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (
SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (
SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
def total(self) -> int:
"""
获取所有任务总数
"""
return sum([len(job.tasks) for job in self._job_view.values()])
with job_lock:
return sum([len(job.tasks) for job in self._job_view.values()])
def list_jobs(self) -> List[TransferJob]:
"""
获取所有作业的任务列表
"""
return list(self._job_view.values())
with job_lock:
return list(self._job_view.values())
def season_episodes(self, media: MediaInfo, season: Optional[int] = None) -> List[int]:
"""
获取作业的季集清单
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
return self._season_episodes.get(__mediaid__) or []
with job_lock:
__mediaid__ = self.__get_media_id(media=media, season=season)
return self._season_episodes.get(__mediaid__) or []
class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
@@ -1006,8 +1045,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
finally:
# 移除已完成的任务
if self.jobview.is_done(task):
self.jobview.remove_job(task)
self.jobview.try_remove_job(task)
def get_queue_tasks(self) -> List[TransferJob]:
"""