From 270bcff8f354dae26950e7a4a68599857c507106 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 31 Jan 2026 16:38:55 +0000 Subject: [PATCH] Fix task loss issue in do_transfer multi-threading batch adding Co-authored-by: jxxghp <51039935+jxxghp@users.noreply.github.com> --- app/chain/transfer.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/app/chain/transfer.py b/app/chain/transfer.py index cd5f3e17..b019be80 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -111,12 +111,13 @@ class JobManager: """ return schemas.MetaInfo(**task.meta.to_dict()) - def add_task(self, task: TransferTask, state: Optional[str] = "waiting"): + def add_task(self, task: TransferTask, state: Optional[str] = "waiting") -> bool: """ 添加整理任务,自动分组到对应的作业中 + :return: True表示任务已添加,False表示任务已存在(重复) """ if not any([task, task.meta, task.fileitem]): - return + return False with job_lock: __mediaid__ = self.__get_id(task) if __mediaid__ not in self._job_view: @@ -134,7 +135,8 @@ class JobManager: else: # 不重复添加任务 if any([t.fileitem == task.fileitem for t in self._job_view[__mediaid__].tasks]): - return + logger.debug(f"任务 {task.fileitem.name} 已存在,跳过重复添加") + return False self._job_view[__mediaid__].tasks.append( TransferJobTask( fileitem=task.fileitem, @@ -150,6 +152,7 @@ class JobManager: self._season_episodes[__mediaid__] = list(set(self._season_episodes[__mediaid__])) else: self._season_episodes[__mediaid__] = task.meta.episode_list + return True def running_task(self, task: TransferTask): """ @@ -724,26 +727,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return ret_status, ret_message - def put_to_queue(self, task: TransferTask): + def put_to_queue(self, task: TransferTask) -> bool: """ 添加到待整理队列 :param task: 任务信息 + :return: True表示任务已添加到队列,False表示任务已存在(重复) """ if not task: - return - # 维护整理任务视图 - self.__put_to_jobview(task) + return False + # 维护整理任务视图,如果任务已存在则不添加到队列 + if not self.__put_to_jobview(task): + return False # 添加到队列 self._queue.put(TransferQueue( task=task, callback=self.__default_callback )) + return True - def __put_to_jobview(self, task: TransferTask): + def __put_to_jobview(self, task: TransferTask) -> bool: """ 添加到作业视图 + :return: True表示任务已添加,False表示任务已存在(重复) """ - self.jobview.add_task(task) + return self.jobview.add_task(task) def remove_from_queue(self, fileitem: FileItem): """ @@ -1357,12 +1364,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): background=background ) if background: - self.put_to_queue(task=transfer_task) - logger.info(f"{file_path.name} 已添加到整理队列") + if self.put_to_queue(task=transfer_task): + logger.info(f"{file_path.name} 已添加到整理队列") + else: + logger.debug(f"{file_path.name} 已在整理队列中,跳过") else: # 加入列表 - self.__put_to_jobview(transfer_task) - transfer_tasks.append(transfer_task) + if self.__put_to_jobview(transfer_task): + transfer_tasks.append(transfer_task) + else: + logger.debug(f"{file_path.name} 已在整理列表中,跳过") finally: file_items.clear() del file_items