feat:TRANSFER_THREADS 变更监听

This commit is contained in:
jxxghp
2026-01-21 20:46:34 +08:00
parent 58030bbcff
commit a1408ee18f

View File

@@ -30,6 +30,7 @@ from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeForm
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
@@ -359,11 +360,15 @@ class JobManager:
return self._season_episodes.get(__mediaid__) or []
class TransferChain(ChainBase, metaclass=Singleton):
class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
文件整理处理链
"""
CONFIG_WATCH = {
"TRANSFER_THREADS",
}
def __init__(self):
super().__init__()
# 主要媒体文件后缀
@@ -385,6 +390,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 整理进度进度
self._progress = ProgressHelper(ProgressKey.FileTransfer)
# 队列相关状态
self._threads = []
self._queue_active = False
self._active_tasks = 0
self._processed_num = 0
self._fail_num = 0
@@ -394,15 +401,31 @@ class TransferChain(ChainBase, metaclass=Singleton):
def __init(self):
"""
初始化
启动文件整理线程
"""
# 启动文件整理线程
self._queue_active = True
for i in range(settings.TRANSFER_THREADS):
logger.info(f"启动文件整理线程 {i+1} ...")
thread = threading.Thread(target=self.__start_transfer,
name=f"transfer-{i}",
daemon=True)
self._threads.append(thread)
thread.start()
def __stop(self):
"""
停止文件整理进程
"""
self._queue_active = False
for thread in self._threads:
thread.join()
self._threads = []
logger.info("文件整理线程已停止")
def on_config_changed(self):
self.__stop()
self.__init()
def __is_allowed_file(self, fileitem: FileItem) -> bool:
"""
判断是否允许的扩展名
@@ -608,7 +631,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
处理队列
"""
while not global_vars.is_system_stopped:
while not global_vars.is_system_stopped and self._queue_active:
try:
item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval)
if not item: