Merge pull request #2888 from thsrite/dev

This commit is contained in:
jxxghp
2024-10-21 13:35:04 +08:00
committed by GitHub

View File

@@ -1,3 +1,4 @@
import datetime
import platform
import queue
import re
@@ -30,6 +31,7 @@ from app.log import logger
from app.schemas import FileItem, TransferInfo, Notification
from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
lock = Lock()
snapshot_lock = Lock()
@@ -87,6 +89,12 @@ class Monitor(metaclass=Singleton):
# 文件整理间隔(秒)
_transfer_interval = 60
# 消息汇总
_msg_medias = {}
# 消息汇总间隔(秒)
_msg_interval = 60
def __init__(self):
super().__init__()
self.chain = MonitorChain()
@@ -133,6 +141,9 @@ class Monitor(metaclass=Singleton):
self.systemmessage.put(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控", title="目录监控")
continue
# 启动定时服务进程
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 启动监控
if mon_dir.storage == "local":
# 本地目录监控
@@ -159,8 +170,6 @@ class Monitor(metaclass=Singleton):
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控")
else:
# 启动定时服务进程
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 远程目录监控
self._scheduler.add_job(self.polling_observer, 'interval', minutes=self._snapshot_interval,
kwargs={
@@ -168,6 +177,9 @@ class Monitor(metaclass=Singleton):
'mon_path': mon_path
})
# 追加入库消息统一发送服务
self._scheduler.add_job(self.__send_msg, trigger='interval', seconds=15)
@staticmethod
def __choose_observer() -> Any:
"""
@@ -442,10 +454,8 @@ class Monitor(metaclass=Singleton):
'transferinfo': transferinfo
})
# TODO 汇总发送成功消息
self.transferchain.send_transfer_message(meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo)
# 发送消息汇总
self.__collect_msg_medias(mediainfo=mediainfo, file_meta=file_meta, transferinfo=transferinfo)
# 移动模式删除空目录
if dir_info.transfer_type in ["move"]:
@@ -455,6 +465,115 @@ class Monitor(metaclass=Singleton):
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def __collect_msg_medias(self, mediainfo: MediaInfo, file_meta: MetaInfoPath, transferinfo: TransferInfo):
"""
收集媒体处理完的消息
"""
media_list = self._msg_medias.get(mediainfo.title_year + " " + file_meta.season) or {}
if media_list:
media_files = media_list.get("files") or []
if media_files:
file_exists = False
for file in media_files:
if str(file_meta.path) == file.get("path"):
file_exists = True
break
if not file_exists:
media_files.append({
"path": str(file_meta.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
})
else:
media_files = [
{
"path": str(file_meta.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
]
media_list = {
"files": media_files,
"time": datetime.datetime.now()
}
else:
media_list = {
"files": [
{
"path": str(file_meta.path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
],
"time": datetime.datetime.now()
}
self._msg_medias[mediainfo.title_year + " " + file_meta.season] = media_list
def __send_msg(self):
"""
定时检查是否有媒体处理完,发送统一消息
"""
if not self._msg_medias or not self._msg_medias.keys():
return
# 遍历检查是否已刮削完,发送消息
for medis_title_year_season in list(self._msg_medias.keys()):
media_list = self._msg_medias.get(medis_title_year_season)
logger.info(f"开始处理媒体 {medis_title_year_season} 消息")
if not media_list:
continue
# 获取最后更新时间
last_update_time = media_list.get("time")
media_files = media_list.get("files")
if not last_update_time or not media_files:
continue
transferinfo = media_files[0].get("transferinfo")
file_meta = media_files[0].get("file_meta")
mediainfo = media_files[0].get("mediainfo")
# 判断剧集最后更新时间距现在是已超过10秒或者电影发送消息
if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._msg_interval) \
or mediainfo.type == MediaType.MOVIE:
# 汇总处理文件总大小
total_size = 0
file_count = 0
# 剧集汇总
episodes = []
for file in media_files:
transferinfo = file.get("transferinfo")
total_size += transferinfo.total_size
file_count += 1
file_meta = file.get("file_meta")
if file_meta and file_meta.begin_episode:
episodes.append(file_meta.begin_episode)
transferinfo.total_size = total_size
# 汇总处理文件数量
transferinfo.file_count = file_count
# 剧集季集信息 S01 E01-E04 || S01 E01、E02、E04
season_episode = None
# 处理文件多,说明是剧集,显示季入库消息
if mediainfo.type == MediaType.TV:
# 季集文本
season_episode = f"{file_meta.season} {StringUtils.format_ep(episodes)}"
# 发送消息
self.transferchain.send_transfer_message(meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo,
season_episode=season_episode)
# 发送完消息移出key
del self._msg_medias[medis_title_year_season]
continue
def stop(self):
"""
退出插件