From 71a9fe10f4e270c089dfe10be8e7cf38377994ea Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 24 Aug 2025 09:02:55 +0800 Subject: [PATCH] refactor ProgressHelper --- app/api/endpoints/storage.py | 15 +++--- app/api/endpoints/system.py | 4 +- app/chain/search.py | 55 ++++++++------------ app/chain/transfer.py | 42 ++++++--------- app/helper/progress.py | 50 ++++++++---------- app/modules/filemanager/storages/__init__.py | 30 ++++++++++- 6 files changed, 99 insertions(+), 97 deletions(-) diff --git a/app/api/endpoints/storage.py b/app/api/endpoints/storage.py index 6e500e16..8f9b64b0 100644 --- a/app/api/endpoints/storage.py +++ b/app/api/endpoints/storage.py @@ -171,15 +171,14 @@ def rename(fileitem: schemas.FileItem, sub_files: List[schemas.FileItem] = StorageChain().list_files(fileitem) if sub_files: # 开始进度 - progress = ProgressHelper() - progress.start(ProgressKey.BatchRename) + progress = ProgressHelper(ProgressKey.BatchRename) + progress.start() total = len(sub_files) handled = 0 for sub_file in sub_files: handled += 1 progress.update(value=handled / total * 100, - text=f"正在处理 {sub_file.name} ...", - key=ProgressKey.BatchRename) + text=f"正在处理 {sub_file.name} ...") if sub_file.type == "dir": continue if not sub_file.extension: @@ -190,19 +189,19 @@ def rename(fileitem: schemas.FileItem, meta = MetaInfoPath(sub_path) mediainfo = transferchain.recognize_media(meta) if not mediainfo: - progress.end(ProgressKey.BatchRename) + progress.end() return schemas.Response(success=False, message=f"{sub_path.name} 未识别到媒体信息") new_path = transferchain.recommend_name(meta=meta, mediainfo=mediainfo) if not new_path: - progress.end(ProgressKey.BatchRename) + progress.end() return schemas.Response(success=False, message=f"{sub_path.name} 未识别到新名称") ret: schemas.Response = rename(fileitem=sub_file, new_name=Path(new_path).name, recursive=False) if not ret.success: - progress.end(ProgressKey.BatchRename) + progress.end() return schemas.Response(success=False, message=f"{sub_path.name} 重命名失败!") - progress.end(ProgressKey.BatchRename) + progress.end() # 重命名自己 result = StorageChain().rename_file(fileitem, new_name) if result: diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index eae507b7..f2fde72e 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -254,14 +254,14 @@ async def get_progress(request: Request, process_type: str, _: schemas.TokenPayl """ 实时获取处理进度,返回格式为SSE """ - progress = ProgressHelper() + progress = ProgressHelper(process_type) async def event_generator(): try: while not global_vars.is_system_stopped: if await request.is_disconnected(): break - detail = progress.get(process_type) + detail = progress.get() yield f"data: {json.dumps(detail)}\n\n" await asyncio.sleep(0.5) except asyncio.CancelledError: diff --git a/app/chain/search.py b/app/chain/search.py index f76edb66..b22b418a 100644 --- a/app/chain/search.py +++ b/app/chain/search.py @@ -215,12 +215,11 @@ class SearchChain(ChainBase): return [] # 开始新进度 - progress = ProgressHelper() - progress.start(ProgressKey.Search) + progress = ProgressHelper(ProgressKey.Search) + progress.start() # 开始过滤 - progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...', - key=ProgressKey.Search) + progress.update(value=0, text=f'开始过滤,总 {len(torrents)} 个资源,请稍候...') # 匹配订阅附加参数 if filter_params: logger.info(f'开始附加参数过滤,附加参数:{filter_params} ...') @@ -238,7 +237,7 @@ class SearchChain(ChainBase): logger.info(f"过滤规则/剧集过滤完成,剩余 {len(torrents)} 个资源") # 过滤完成 - progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源', key=ProgressKey.Search) + progress.update(value=50, text=f'过滤完成,剩余 {len(torrents)} 个资源') # 总数 _total = len(torrents) @@ -251,14 +250,13 @@ class SearchChain(ChainBase): try: # 英文标题应该在别名/原标题中,不需要再匹配 logger.info(f"开始匹配结果 标题:{mediainfo.title},原标题:{mediainfo.original_title},别名:{mediainfo.names}") - progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search) + progress.update(value=51, text=f'开始匹配,总 {_total} 个资源 ...') for torrent in torrents: if global_vars.is_system_stopped: break _count += 1 progress.update(value=(_count / _total) * 96, - text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...', - key=ProgressKey.Search) + text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...') if not torrent.title: continue @@ -291,8 +289,7 @@ class SearchChain(ChainBase): # 匹配完成 logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源") progress.update(value=97, - text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源', - key=ProgressKey.Search) + text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源') # 去掉mediainfo中多余的数据 mediainfo.clear() @@ -308,16 +305,14 @@ class SearchChain(ChainBase): # 排序 progress.update(value=99, - text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...', - key=ProgressKey.Search) + text=f'正在对 {len(contexts)} 个资源进行排序,请稍候...') contexts = torrenthelper.sort_torrents(contexts) # 结束进度 logger.info(f'搜索完成,共 {len(contexts)} 个资源') progress.update(value=100, - text=f'搜索完成,共 {len(contexts)} 个资源', - key=ProgressKey.Search) - progress.end(ProgressKey.Search) + text=f'搜索完成,共 {len(contexts)} 个资源') + progress.end() # 去重后返回 return self.__remove_duplicate(contexts) @@ -521,8 +516,8 @@ class SearchChain(ChainBase): return [] # 开始进度 - progress = ProgressHelper() - progress.start(ProgressKey.Search) + progress = ProgressHelper(ProgressKey.Search) + progress.start() # 开始计时 start_time = datetime.now() # 总数 @@ -531,8 +526,7 @@ class SearchChain(ChainBase): finish_count = 0 # 更新进度 progress.update(value=0, - text=f"开始搜索,共 {total_num} 个站点 ...", - key=ProgressKey.Search) + text=f"开始搜索,共 {total_num} 个站点 ...") # 结果集 results = [] # 多线程 @@ -561,17 +555,15 @@ class SearchChain(ChainBase): results.extend(result) logger.info(f"站点搜索进度:{finish_count} / {total_num}") progress.update(value=finish_count / total_num * 100, - text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...", - key=ProgressKey.Search) + text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...") # 计算耗时 end_time = datetime.now() # 更新进度 progress.update(value=100, - text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", - key=ProgressKey.Search) + text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") # 结束进度 - progress.end(ProgressKey.Search) + progress.end() # 返回 return results @@ -606,8 +598,8 @@ class SearchChain(ChainBase): return [] # 开始进度 - progress = ProgressHelper() - progress.start(ProgressKey.Search) + progress = ProgressHelper(ProgressKey.Search) + progress.start() # 开始计时 start_time = datetime.now() # 总数 @@ -616,8 +608,7 @@ class SearchChain(ChainBase): finish_count = 0 # 更新进度 progress.update(value=0, - text=f"开始搜索,共 {total_num} 个站点 ...", - key=ProgressKey.Search) + text=f"开始搜索,共 {total_num} 个站点 ...") # 结果集 results = [] @@ -648,18 +639,16 @@ class SearchChain(ChainBase): results.extend(result) logger.info(f"站点搜索进度:{finish_count} / {total_num}") progress.update(value=finish_count / total_num * 100, - text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...", - key=ProgressKey.Search) + text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个站点 ...") # 计算耗时 end_time = datetime.now() # 更新进度 progress.update(value=100, - text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", - key=ProgressKey.Search) + text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") # 结束进度 - progress.end(ProgressKey.Search) + progress.end() # 返回 return results diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 280a575a..ece2fac4 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -556,7 +556,7 @@ class TransferChain(ChainBase, metaclass=Singleton): # 失败数量 fail_num = 0 - progress = ProgressHelper() + progress = ProgressHelper(ProgressKey.FileTransfer) while not global_vars.is_system_stopped: try: @@ -571,7 +571,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if __queue_start: logger.info("开始整理队列处理...") # 启动进度 - progress.start(ProgressKey.FileTransfer) + progress.start() # 重置计数 processed_num = 0 fail_num = 0 @@ -579,16 +579,14 @@ class TransferChain(ChainBase, metaclass=Singleton): __process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..." logger.info(__process_msg) progress.update(value=0, - text=__process_msg, - key=ProgressKey.FileTransfer) + text=__process_msg) # 队列已开始 __queue_start = False # 更新进度 __process_msg = f"正在整理 {fileitem.name} ..." logger.info(__process_msg) progress.update(value=processed_num / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + text=__process_msg) # 整理 state, err_msg = self.__handle_transfer(task=task, callback=item.callback) if not state: @@ -599,17 +597,15 @@ class TransferChain(ChainBase, metaclass=Singleton): __process_msg = f"{fileitem.name} 整理完成" logger.info(__process_msg) progress.update(value=processed_num / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + text=__process_msg) except queue.Empty: if not __queue_start: # 结束进度 __end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) progress.update(value=100, - text=__end_msg, - key=ProgressKey.FileTransfer) - progress.end(ProgressKey.FileTransfer) + text=__end_msg) + progress.end() # 重置计数 processed_num = 0 fail_num = 0 @@ -1167,13 +1163,12 @@ class TransferChain(ChainBase, metaclass=Singleton): fail_num = 0 # 启动进度 - progress = ProgressHelper() - progress.start(ProgressKey.FileTransfer) + progress = ProgressHelper(ProgressKey.FileTransfer) + progress.start() __process_msg = f"开始整理,共 {total_num} 个文件 ..." logger.info(__process_msg) progress.update(value=0, - text=__process_msg, - key=ProgressKey.FileTransfer) + text=__process_msg) try: for transfer_task in transfer_tasks: if global_vars.is_system_stopped: @@ -1184,8 +1179,7 @@ class TransferChain(ChainBase, metaclass=Singleton): __process_msg = f"正在整理 ({processed_num + fail_num + 1}/{total_num}){transfer_task.fileitem.name} ..." logger.info(__process_msg) progress.update(value=(processed_num + fail_num) / total_num * 100, - text=__process_msg, - key=ProgressKey.FileTransfer) + text=__process_msg) state, err_msg = self.__handle_transfer( task=transfer_task, callback=self.__default_callback @@ -1205,9 +1199,8 @@ class TransferChain(ChainBase, metaclass=Singleton): __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" logger.info(__end_msg) progress.update(value=100, - text=__end_msg, - key=ProgressKey.FileTransfer) - progress.end(ProgressKey.FileTransfer) + text=__end_msg) + progress.end() error_msg = "、".join(err_msgs[:2]) + (f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "") return all_success, error_msg @@ -1353,11 +1346,10 @@ class TransferChain(ChainBase, metaclass=Singleton): # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) # 开始进度 - progress = ProgressHelper() - progress.start(ProgressKey.FileTransfer) + progress = ProgressHelper(ProgressKey.FileTransfer) + progress.start() progress.update(value=0, - text=f"开始整理 {fileitem.path} ...", - key=ProgressKey.FileTransfer) + text=f"开始整理 {fileitem.path} ...") # 开始整理 state, errmsg = self.do_transfer( fileitem=fileitem, @@ -1378,7 +1370,7 @@ class TransferChain(ChainBase, metaclass=Singleton): if not state: return False, errmsg - progress.end(ProgressKey.FileTransfer) + progress.end() logger.info(f"{fileitem.path} 整理完成") return True, "" else: diff --git a/app/helper/progress.py b/app/helper/progress.py index 54f8d012..78ed414b 100644 --- a/app/helper/progress.py +++ b/app/helper/progress.py @@ -1,55 +1,49 @@ from enum import Enum from typing import Union, Optional +from app.core.cache import TTLCache from app.schemas.types import ProgressKey from app.utils.singleton import WeakSingleton class ProgressHelper(metaclass=WeakSingleton): + """ + 处理进度辅助类 + """ - def __init__(self): - self._process_detail = {} - - def init_config(self): - pass - - def __reset(self, key: Union[ProgressKey, str]): + def __init__(self, key: Union[ProgressKey, str]): if isinstance(key, Enum): key = key.value - self._process_detail[key] = { + self._key = key + self._progress = TTLCache(maxsize=1024, ttl=24 * 60 * 60) + + def __reset(self): + self._progress[self._key] = { "enable": False, "value": 0, "text": "请稍候..." } - def start(self, key: Union[ProgressKey, str]): - self.__reset(key) - if isinstance(key, Enum): - key = key.value - self._process_detail[key]['enable'] = True + def start(self): + self.__reset() + self._progress[self._key]['enable'] = True - def end(self, key: Union[ProgressKey, str]): - if isinstance(key, Enum): - key = key.value - if not self._process_detail.get(key): + def end(self): + if not self._progress.get(self._key): return - self._process_detail[key] = { + self._progress[self._key] = { "enable": False, "value": 100, "text": "正在处理..." } - def update(self, key: Union[ProgressKey, str], value: Union[float, int] = None, text: Optional[str] = None): - if isinstance(key, Enum): - key = key.value - if not self._process_detail.get(key, {}).get('enable'): + def update(self, value: Union[float, int] = None, text: Optional[str] = None): + if not self._progress.get(self._key).get('enable'): return if value: - self._process_detail[key]['value'] = value + self._progress[self._key]['value'] = value if text: - self._process_detail[key]['text'] = text + self._progress[self._key]['text'] = text - def get(self, key: Union[ProgressKey, str]) -> dict: - if isinstance(key, Enum): - key = key.value - return self._process_detail.get(key) + def get(self) -> dict: + return self._progress.get(self._key) diff --git a/app/modules/filemanager/storages/__init__.py b/app/modules/filemanager/storages/__init__.py index 67f602c3..dcc03aae 100644 --- a/app/modules/filemanager/storages/__init__.py +++ b/app/modules/filemanager/storages/__init__.py @@ -1,12 +1,40 @@ from abc import ABCMeta, abstractmethod from pathlib import Path -from typing import Optional, List, Dict, Tuple +from typing import Optional, List, Dict, Tuple, Callable, Union + +from tqdm import tqdm from app import schemas +from app.helper.progress import ProgressHelper from app.helper.storage import StorageHelper from app.log import logger +def transfer_process(path: str) -> Callable[[int | float], None]: + """ + 传输进度回调 + """ + pbar = tqdm(total=100, desc="整理进度", unit="%") + progress = ProgressHelper(path) + progress.start() + + def update_progress(percent: Union[int, float]) -> None: + """ + 更新进度百分比 + """ + percent_value = int(percent) + pbar.n = percent_value + # 更新进度 + pbar.refresh() + progress.update(value=percent_value, text=f"{path} 进度:{percent_value}%") + # 完成时结束 + if percent_value >= 100: + progress.end() + pbar.close() + + return update_progress + + class StorageBase(metaclass=ABCMeta): """ 存储基类