From 7b8cd37a9b4beef59ac9520ef0dd433692d6738d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 1 Feb 2026 16:58:32 +0800 Subject: [PATCH] feat(transfer): enhance job removal methods for thread safety and strict checks --- app/chain/transfer.py | 204 +++++++++++++++++++++++++----------------- 1 file changed, 121 insertions(+), 83 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 032cb992..a9930d7c 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -224,7 +224,7 @@ class JobManager: def remove_job(self, task: TransferTask) -> Optional[TransferJob]: """ - 移除任务对应的作业 + 移除任务对应的作业(强制,线程不安全) """ with job_lock: __mediaid__ = self.__get_id(task) @@ -235,68 +235,99 @@ class JobManager: return self._job_view.pop(__mediaid__) return None + def try_remove_job(self, task: TransferTask): + """ + 尝试移除任务对应的作业(严格检查未完成作业,线程安全) + """ + with job_lock: + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + + meta_done = True + if __metaid__ in self._job_view: + meta_done = all( + t.state in ["completed", "failed"] for t in self._job_view[__metaid__].tasks + ) + + media_done = True + if __mediaid__ in self._job_view: + media_done = all( + t.state in ["completed", "failed"] for t in self._job_view[__mediaid__].tasks + ) + + if meta_done and media_done: + __id__ = self.__get_id(task) + if __id__ in self._job_view: + # 移除季集信息 + if __id__ in self._season_episodes: + self._season_episodes.pop(__id__) + self._job_view.pop(__id__) + def is_done(self, task: TransferTask) -> bool: """ 检查任务对应的作业是否整理完成(不管成功还是失败) """ - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) - if __metaid__ in self._job_view: - meta_done = all( - task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks - ) - else: - meta_done = True - if __mediaid__ in self._job_view: - media_done = all( - task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks - ) - else: - media_done = True - return meta_done and media_done + with job_lock: + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __metaid__ in self._job_view: + meta_done = all( + task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks + ) + else: + meta_done = True + if __mediaid__ in self._job_view: + media_done = all( + task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks + ) + else: + media_done = True + return meta_done and media_done def is_finished(self, task: TransferTask) -> bool: """ 检查任务对应的作业是否已完成且有成功的记录 """ - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) - if __metaid__ in self._job_view: - meta_finished = all( - task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks - ) - else: - meta_finished = True - if __mediaid__ in self._job_view: - tasks = self._job_view[__mediaid__].tasks - media_finished = all( - task.state in ["completed", "failed"] for task in tasks - ) and any( - task.state == "completed" for task in tasks - ) - else: - media_finished = True - return meta_finished and media_finished + with job_lock: + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __metaid__ in self._job_view: + meta_finished = all( + task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks + ) + else: + meta_finished = True + if __mediaid__ in self._job_view: + tasks = self._job_view[__mediaid__].tasks + media_finished = all( + task.state in ["completed", "failed"] for task in tasks + ) and any( + task.state == "completed" for task in tasks + ) + else: + media_finished = True + return meta_finished and media_finished def is_success(self, task: TransferTask) -> bool: """ 检查任务对应的作业是否全部成功 """ - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) - if __metaid__ in self._job_view: - meta_success = all( - task.state in ["completed"] for task in self._job_view[__metaid__].tasks - ) - else: - meta_success = True - if __mediaid__ in self._job_view: - media_success = all( - task.state in ["completed"] for task in self._job_view[__mediaid__].tasks - ) - else: - media_success = True - return meta_success and media_success + with job_lock: + __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) + __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + if __metaid__ in self._job_view: + meta_success = all( + task.state in ["completed"] for task in self._job_view[__metaid__].tasks + ) + else: + meta_success = True + if __mediaid__ in self._job_view: + media_success = all( + task.state in ["completed"] for task in self._job_view[__mediaid__].tasks + ) + else: + media_success = True + return meta_success and media_success def get_all_torrent_hashes(self) -> set[str]: """ @@ -337,74 +368,82 @@ class JobManager: """ 判断作业是否还有任务正在处理 """ - if mediainfo: - __mediaid__ = self.__get_media_id(media=mediainfo, season=season) - if __mediaid__ in self._job_view: - return True + with job_lock: + if mediainfo: + __mediaid__ = self.__get_media_id(media=mediainfo, season=season) + if __mediaid__ in self._job_view: + return True - __metaid__ = self.__get_meta_id(meta=meta, season=season) - return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0 + __metaid__ = self.__get_meta_id(meta=meta, season=season) + return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0 def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: """ 获取作业中所有成功的任务 """ - __mediaid__ = self.__get_media_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return [] - return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"] + with job_lock: + __mediaid__ = self.__get_media_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return [] + return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"] def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: """ 获取作业中全部任务 """ - __mediaid__ = self.__get_media_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return [] - return self._job_view[__mediaid__].tasks + with job_lock: + __mediaid__ = self.__get_media_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return [] + return self._job_view[__mediaid__].tasks def count(self, media: MediaInfo, season: Optional[int] = None) -> int: """ 获取作业中成功总数 """ - __mediaid__ = self.__get_media_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return 0 - return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]) + with job_lock: + __mediaid__ = self.__get_media_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return 0 + return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]) def size(self, media: MediaInfo, season: Optional[int] = None) -> int: """ 获取作业中所有成功文件总大小 """ - __mediaid__ = self.__get_media_id(media=media, season=season) - if __mediaid__ not in self._job_view: - return 0 - return sum([ - task.fileitem.size if task.fileitem.size is not None - else ( - SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0) - for task in self._job_view[__mediaid__].tasks - if task.state == "completed" - ]) + with job_lock: + __mediaid__ = self.__get_media_id(media=media, season=season) + if __mediaid__ not in self._job_view: + return 0 + return sum([ + task.fileitem.size if task.fileitem.size is not None + else ( + SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0) + for task in self._job_view[__mediaid__].tasks + if task.state == "completed" + ]) def total(self) -> int: """ 获取所有任务总数 """ - return sum([len(job.tasks) for job in self._job_view.values()]) + with job_lock: + return sum([len(job.tasks) for job in self._job_view.values()]) def list_jobs(self) -> List[TransferJob]: """ 获取所有作业的任务列表 """ - return list(self._job_view.values()) + with job_lock: + return list(self._job_view.values()) def season_episodes(self, media: MediaInfo, season: Optional[int] = None) -> List[int]: """ 获取作业的季集清单 """ - __mediaid__ = self.__get_media_id(media=media, season=season) - return self._season_episodes.get(__mediaid__) or [] + with job_lock: + __mediaid__ = self.__get_media_id(media=media, season=season) + return self._season_episodes.get(__mediaid__) or [] class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): @@ -1006,8 +1045,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): finally: # 移除已完成的任务 - if self.jobview.is_done(task): - self.jobview.remove_job(task) + self.jobview.try_remove_job(task) def get_queue_tasks(self) -> List[TransferJob]: """