from enum import Enum from typing import Union, Optional from app.core.cache import TTLCache from app.schemas.types import ProgressKey from app.utils.singleton import WeakSingleton class ProgressHelper(metaclass=WeakSingleton): """ 处理进度辅助类 """ def __init__(self, key: Union[ProgressKey, str]): if isinstance(key, Enum): key = key.value self._key = key self._progress = TTLCache(maxsize=1024, ttl=24 * 60 * 60) def __reset(self): self._progress[self._key] = { "enable": False, "value": 0, "text": "请稍候..." } def start(self): self.__reset() self._progress[self._key]['enable'] = True def end(self): if not self._progress.get(self._key): return self._progress[self._key] = { "enable": False, "value": 100, "text": "正在处理..." } def update(self, value: Union[float, int] = None, text: Optional[str] = None): if not self._progress.get(self._key).get('enable'): return if value: self._progress[self._key]['value'] = value if text: self._progress[self._key]['text'] = text def get(self) -> dict: return self._progress.get(self._key)