diff --git a/app/chain/transfer.py b/app/chain/transfer.py index bdddbf12..43619d04 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -374,13 +374,20 @@ class TransferChain(ChainBase, metaclass=Singleton): # 待整理任务队列 self._queue = queue.Queue() # 文件整理线程 - self._transfer_thread = None + self._transfer_threads = [] # 队列间隔时间(秒) self._transfer_interval = 15 # 事件管理器 self.jobview = JobManager() # 转移成功的文件清单 self._success_target_files: Dict[str, List[str]] = {} + # 整理进度进度 + self._progress = ProgressHelper(ProgressKey.FileTransfer) + # 队列相关状态 + self._active_tasks = 0 + self._processed_num = 0 + self._fail_num = 0 + self._total_num = 0 # 启动整理任务 self.__init() @@ -389,8 +396,11 @@ class TransferChain(ChainBase, metaclass=Singleton): 初始化 """ # 启动文件整理线程 - self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) - self._transfer_thread.start() + for i in range(settings.TRANSFER_THREADS): + thread = threading.Thread(target=self.__start_transfer, + name=f"transfer-{i}", + daemon=True) + thread.start() def __default_callback(self, task: TransferTask, transferinfo: TransferInfo, /) -> Tuple[bool, str]: @@ -555,75 +565,81 @@ class TransferChain(ChainBase, metaclass=Singleton): """ 处理队列 """ - # 队列开始标识 - __queue_start = True - # 任务总数 - total_num = 0 - # 已处理总数 - processed_num = 0 - # 失败数量 - fail_num = 0 - - progress = ProgressHelper(ProgressKey.FileTransfer) - while not global_vars.is_system_stopped: try: - item: TransferQueue = self._queue.get(block=False) - if item: - task = item.task - if not task: - continue - # 文件信息 - fileitem = task.fileitem - # 开始新队列 - if __queue_start: + item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval) + if not item: + continue + + task = item.task + if not task: + self._queue.task_done() + continue + + # 文件信息 + fileitem = task.fileitem + + with task_lock: + # 如果当前没有在运行的任务且处理数为0,说明是一个新序列的开始 + if self._active_tasks == 0 and self._processed_num == 0: logger.info("开始整理队列处理...") # 启动进度 - progress.start() + self._progress.start() # 重置计数 - processed_num = 0 - fail_num = 0 - total_num = self.jobview.total() - __process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..." + self._processed_num = 0 + self._fail_num = 0 + self._total_num = self.jobview.total() + __process_msg = f"开始整理队列处理,当前共 {self._total_num} 个文件 ..." logger.info(__process_msg) - progress.update(value=0, - text=__process_msg) - # 队列已开始 - __queue_start = False + self._progress.update(value=0, + text=__process_msg) + # 增加运行中的任务数 + self._active_tasks += 1 + + try: # 更新进度 __process_msg = f"正在整理 {fileitem.name} ..." logger.info(__process_msg) - progress.update(value=processed_num / total_num * 100, - text=__process_msg, - data={}) + with task_lock: + self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 0, + text=__process_msg) # 整理 state, err_msg = self.__handle_transfer(task=task, callback=item.callback) - if not state: - # 任务失败 - fail_num += 1 - # 更新进度 - processed_num += 1 - __process_msg = f"{fileitem.name} 整理完成" - logger.info(__process_msg) - progress.update(value=(processed_num / total_num) * 100, - text=__process_msg, - data={}) - except queue.Empty: - if not __queue_start: - # 结束进度 - __end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num} 个" - logger.info(__end_msg) - progress.update(value=100, - text=__end_msg) - progress.end() - # 重置计数 - processed_num = 0 - fail_num = 0 - # 标记为新队列 - __queue_start = True + with task_lock: + if not state: + # 任务失败 + self._fail_num += 1 + # 更新进度 + self._processed_num += 1 + __process_msg = f"{fileitem.name} 整理完成" + logger.info(__process_msg) + self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 100, + text=__process_msg) + except Exception as e: + logger.error(f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}") + with task_lock: + self._processed_num += 1 + self._fail_num += 1 + finally: + self._queue.task_done() + with task_lock: + # 减少运行中的任务数 + self._active_tasks -= 1 + # 检查是否所有任务都已完成且队列为空 + if self._active_tasks == 0 and self._queue.empty(): + # 结束进度 + __end_msg = f"整理队列处理完成,共整理 {self._processed_num} 个文件,失败 {self._fail_num} 个" + logger.info(__end_msg) + self._progress.update(value=100, + text=__end_msg) + self._progress.end() + # 重置计数 + self._processed_num = 0 + self._fail_num = 0 - # 等待一定时间,以让其他任务加入队列 - sleep(self._transfer_interval) + except queue.Empty: + # 即使队列空了,如果还有任务在运行,也不应该结束进度 + # 这部分逻辑已经在 finally 的 active_tasks == 0 中处理了 continue except Exception as e: logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}") diff --git a/app/core/config.py b/app/core/config.py index 401a9857..a8c22b2b 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -303,6 +303,8 @@ class ConfigModel(BaseModel): COOKIECLOUD_BLACKLIST: Optional[str] = None # ==================== 整理配置 ==================== + # 文件整理线程数 + TRANSFER_THREADS: int = 5 # 电影重命名格式 MOVIE_RENAME_FORMAT: str = "{{title}}{% if year %} ({{year}}){% endif %}" \ "/{{title}}{% if year %} ({{year}}){% endif %}{% if part %}-{{part}}{% endif %}{% if videoFormat %} - {{videoFormat}}{% endif %}" \