feat: 为文件整理服务引入多线程处理并优化进度管理。

This commit is contained in:
jxxghp
2026-01-21 08:16:02 +08:00
parent bf290f063d
commit 85cacd447b
2 changed files with 78 additions and 60 deletions

View File

@@ -374,13 +374,20 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 待整理任务队列 # 待整理任务队列
self._queue = queue.Queue() self._queue = queue.Queue()
# 文件整理线程 # 文件整理线程
self._transfer_thread = None self._transfer_threads = []
# 队列间隔时间(秒) # 队列间隔时间(秒)
self._transfer_interval = 15 self._transfer_interval = 15
# 事件管理器 # 事件管理器
self.jobview = JobManager() self.jobview = JobManager()
# 转移成功的文件清单 # 转移成功的文件清单
self._success_target_files: Dict[str, List[str]] = {} self._success_target_files: Dict[str, List[str]] = {}
# 整理进度进度
self._progress = ProgressHelper(ProgressKey.FileTransfer)
# 队列相关状态
self._active_tasks = 0
self._processed_num = 0
self._fail_num = 0
self._total_num = 0
# 启动整理任务 # 启动整理任务
self.__init() self.__init()
@@ -389,8 +396,11 @@ class TransferChain(ChainBase, metaclass=Singleton):
初始化 初始化
""" """
# 启动文件整理线程 # 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) for i in range(settings.TRANSFER_THREADS):
self._transfer_thread.start() thread = threading.Thread(target=self.__start_transfer,
name=f"transfer-{i}",
daemon=True)
thread.start()
def __default_callback(self, task: TransferTask, def __default_callback(self, task: TransferTask,
transferinfo: TransferInfo, /) -> Tuple[bool, str]: transferinfo: TransferInfo, /) -> Tuple[bool, str]:
@@ -555,75 +565,81 @@ class TransferChain(ChainBase, metaclass=Singleton):
""" """
处理队列 处理队列
""" """
# 队列开始标识
__queue_start = True
# 任务总数
total_num = 0
# 已处理总数
processed_num = 0
# 失败数量
fail_num = 0
progress = ProgressHelper(ProgressKey.FileTransfer)
while not global_vars.is_system_stopped: while not global_vars.is_system_stopped:
try: try:
item: TransferQueue = self._queue.get(block=False) item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval)
if item: if not item:
task = item.task continue
if not task:
continue task = item.task
# 文件信息 if not task:
fileitem = task.fileitem self._queue.task_done()
# 开始新队列 continue
if __queue_start:
# 文件信息
fileitem = task.fileitem
with task_lock:
# 如果当前没有在运行的任务且处理数为0说明是一个新序列的开始
if self._active_tasks == 0 and self._processed_num == 0:
logger.info("开始整理队列处理...") logger.info("开始整理队列处理...")
# 启动进度 # 启动进度
progress.start() self._progress.start()
# 重置计数 # 重置计数
processed_num = 0 self._processed_num = 0
fail_num = 0 self._fail_num = 0
total_num = self.jobview.total() self._total_num = self.jobview.total()
__process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..." __process_msg = f"开始整理队列处理,当前共 {self._total_num} 个文件 ..."
logger.info(__process_msg) logger.info(__process_msg)
progress.update(value=0, self._progress.update(value=0,
text=__process_msg) text=__process_msg)
# 队列已开始 # 增加运行中的任务数
__queue_start = False self._active_tasks += 1
try:
# 更新进度 # 更新进度
__process_msg = f"正在整理 {fileitem.name} ..." __process_msg = f"正在整理 {fileitem.name} ..."
logger.info(__process_msg) logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100, with task_lock:
text=__process_msg, self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 0,
data={}) text=__process_msg)
# 整理 # 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback) state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state: with task_lock:
# 任务失败 if not state:
fail_num += 1 # 任务失败
# 更新进度 self._fail_num += 1
processed_num += 1 # 更新进度
__process_msg = f"{fileitem.name} 整理完成" self._processed_num += 1
logger.info(__process_msg) __process_msg = f"{fileitem.name} 整理完成"
progress.update(value=(processed_num / total_num) * 100, logger.info(__process_msg)
text=__process_msg, self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 100,
data={}) text=__process_msg)
except queue.Empty: except Exception as e:
if not __queue_start: logger.error(f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}")
# 结束进度 with task_lock:
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}" self._processed_num += 1
logger.info(__end_msg) self._fail_num += 1
progress.update(value=100, finally:
text=__end_msg) self._queue.task_done()
progress.end() with task_lock:
# 重置计 # 减少运行中的任务
processed_num = 0 self._active_tasks -= 1
fail_num = 0 # 检查是否所有任务都已完成且队列为空
# 标记为新队列 if self._active_tasks == 0 and self._queue.empty():
__queue_start = True # 结束进度
__end_msg = f"整理队列处理完成,共整理 {self._processed_num} 个文件,失败 {self._fail_num}"
logger.info(__end_msg)
self._progress.update(value=100,
text=__end_msg)
self._progress.end()
# 重置计数
self._processed_num = 0
self._fail_num = 0
# 等待一定时间,以让其他任务加入队列 except queue.Empty:
sleep(self._transfer_interval) # 即使队列空了,如果还有任务在运行,也不应该结束进度
# 这部分逻辑已经在 finally 的 active_tasks == 0 中处理了
continue continue
except Exception as e: except Exception as e:
logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}") logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}")

View File

@@ -303,6 +303,8 @@ class ConfigModel(BaseModel):
COOKIECLOUD_BLACKLIST: Optional[str] = None COOKIECLOUD_BLACKLIST: Optional[str] = None
# ==================== 整理配置 ==================== # ==================== 整理配置 ====================
# 文件整理线程数
TRANSFER_THREADS: int = 5
# 电影重命名格式 # 电影重命名格式
MOVIE_RENAME_FORMAT: str = "{{title}}{% if year %} ({{year}}){% endif %}" \ MOVIE_RENAME_FORMAT: str = "{{title}}{% if year %} ({{year}}){% endif %}" \
"/{{title}}{% if year %} ({{year}}){% endif %}{% if part %}-{{part}}{% endif %}{% if videoFormat %} - {{videoFormat}}{% endif %}" \ "/{{title}}{% if year %} ({{year}}){% endif %}{% if part %}-{{part}}{% endif %}{% if videoFormat %} - {{videoFormat}}{% endif %}" \