diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 701c3c4a..82181778 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -30,6 +30,7 @@ from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeForm TransferTask, TransferQueue, TransferJob, TransferJobTask from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \ SystemConfigKey, ChainEventType, ContentType +from app.utils.mixins import ConfigReloadMixin from app.utils.singleton import Singleton from app.utils.string import StringUtils from app.utils.system import SystemUtils @@ -359,11 +360,15 @@ class JobManager: return self._season_episodes.get(__mediaid__) or [] -class TransferChain(ChainBase, metaclass=Singleton): +class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 文件整理处理链 """ + CONFIG_WATCH = { + "TRANSFER_THREADS", + } + def __init__(self): super().__init__() # 主要媒体文件后缀 @@ -385,6 +390,8 @@ class TransferChain(ChainBase, metaclass=Singleton): # 整理进度进度 self._progress = ProgressHelper(ProgressKey.FileTransfer) # 队列相关状态 + self._threads = [] + self._queue_active = False self._active_tasks = 0 self._processed_num = 0 self._fail_num = 0 @@ -394,15 +401,31 @@ class TransferChain(ChainBase, metaclass=Singleton): def __init(self): """ - 初始化 + 启动文件整理线程 """ - # 启动文件整理线程 + self._queue_active = True for i in range(settings.TRANSFER_THREADS): + logger.info(f"启动文件整理线程 {i+1} ...") thread = threading.Thread(target=self.__start_transfer, name=f"transfer-{i}", daemon=True) + self._threads.append(thread) thread.start() + def __stop(self): + """ + 停止文件整理进程 + """ + self._queue_active = False + for thread in self._threads: + thread.join() + self._threads = [] + logger.info("文件整理线程已停止") + + def on_config_changed(self): + self.__stop() + self.__init() + def __is_allowed_file(self, fileitem: FileItem) -> bool: """ 判断是否允许的扩展名 @@ -608,7 +631,7 @@ class TransferChain(ChainBase, metaclass=Singleton): """ 处理队列 """ - while not global_vars.is_system_stopped: + while not global_vars.is_system_stopped and self._queue_active: try: item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval) if not item: