diff --git a/app/monitor.py b/app/monitor.py index f8245626..4f9f5886 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -1,3 +1,4 @@ +import datetime import platform import queue import re @@ -30,6 +31,7 @@ from app.log import logger from app.schemas import FileItem, TransferInfo, Notification from app.schemas.types import SystemConfigKey, MediaType, NotificationType, EventType from app.utils.singleton import Singleton +from app.utils.string import StringUtils lock = Lock() snapshot_lock = Lock() @@ -87,6 +89,12 @@ class Monitor(metaclass=Singleton): # 文件整理间隔(秒) _transfer_interval = 60 + # 消息汇总 + _msg_medias = {} + + # 消息汇总间隔(秒) + _msg_interval = 60 + def __init__(self): super().__init__() self.chain = MonitorChain() @@ -133,6 +141,9 @@ class Monitor(metaclass=Singleton): self.systemmessage.put(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控", title="目录监控") continue + # 启动定时服务进程 + self._scheduler = BackgroundScheduler(timezone=settings.TZ) + # 启动监控 if mon_dir.storage == "local": # 本地目录监控 @@ -159,8 +170,6 @@ class Monitor(metaclass=Singleton): logger.error(f"{mon_path} 启动目录监控失败:{err_msg}") self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控") else: - # 启动定时服务进程 - self._scheduler = BackgroundScheduler(timezone=settings.TZ) # 远程目录监控 self._scheduler.add_job(self.polling_observer, 'interval', minutes=self._snapshot_interval, kwargs={ @@ -168,6 +177,9 @@ class Monitor(metaclass=Singleton): 'mon_path': mon_path }) + # 追加入库消息统一发送服务 + self._scheduler.add_job(self.__send_msg, trigger='interval', seconds=15) + @staticmethod def __choose_observer() -> Any: """ @@ -442,10 +454,8 @@ class Monitor(metaclass=Singleton): 'transferinfo': transferinfo }) - # TODO 汇总发送成功消息 - self.transferchain.send_transfer_message(meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo) + # 发送消息汇总 + self.__collect_msg_medias(mediainfo=mediainfo, file_meta=file_meta, transferinfo=transferinfo) # 移动模式删除空目录 if dir_info.transfer_type in ["move"]: @@ -455,6 +465,115 @@ class Monitor(metaclass=Singleton): except Exception as e: logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) + def __collect_msg_medias(self, mediainfo: MediaInfo, file_meta: MetaInfoPath, transferinfo: TransferInfo): + """ + 收集媒体处理完的消息 + """ + media_list = self._msg_medias.get(mediainfo.title_year + " " + file_meta.season) or {} + if media_list: + media_files = media_list.get("files") or [] + if media_files: + file_exists = False + for file in media_files: + if str(file_meta.path) == file.get("path"): + file_exists = True + break + if not file_exists: + media_files.append({ + "path": str(file_meta.path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + }) + else: + media_files = [ + { + "path": str(file_meta.path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + } + ] + media_list = { + "files": media_files, + "time": datetime.datetime.now() + } + else: + media_list = { + "files": [ + { + "path": str(file_meta.path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + } + ], + "time": datetime.datetime.now() + } + self._msg_medias[mediainfo.title_year + " " + file_meta.season] = media_list + + def __send_msg(self): + """ + 定时检查是否有媒体处理完,发送统一消息 + """ + if not self._msg_medias or not self._msg_medias.keys(): + return + + # 遍历检查是否已刮削完,发送消息 + for medis_title_year_season in list(self._msg_medias.keys()): + media_list = self._msg_medias.get(medis_title_year_season) + logger.info(f"开始处理媒体 {medis_title_year_season} 消息") + + if not media_list: + continue + + # 获取最后更新时间 + last_update_time = media_list.get("time") + media_files = media_list.get("files") + if not last_update_time or not media_files: + continue + + transferinfo = media_files[0].get("transferinfo") + file_meta = media_files[0].get("file_meta") + mediainfo = media_files[0].get("mediainfo") + # 判断剧集最后更新时间距现在是已超过10秒或者电影,发送消息 + if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._msg_interval) \ + or mediainfo.type == MediaType.MOVIE: + + # 汇总处理文件总大小 + total_size = 0 + file_count = 0 + + # 剧集汇总 + episodes = [] + for file in media_files: + transferinfo = file.get("transferinfo") + total_size += transferinfo.total_size + file_count += 1 + + file_meta = file.get("file_meta") + if file_meta and file_meta.begin_episode: + episodes.append(file_meta.begin_episode) + + transferinfo.total_size = total_size + # 汇总处理文件数量 + transferinfo.file_count = file_count + + # 剧集季集信息 S01 E01-E04 || S01 E01、E02、E04 + season_episode = None + # 处理文件多,说明是剧集,显示季入库消息 + if mediainfo.type == MediaType.TV: + # 季集文本 + season_episode = f"{file_meta.season} {StringUtils.format_ep(episodes)}" + # 发送消息 + self.transferchain.send_transfer_message(meta=file_meta, + mediainfo=mediainfo, + transferinfo=transferinfo, + season_episode=season_episode) + # 发送完消息,移出key + del self._msg_medias[medis_title_year_season] + continue + def stop(self): """ 退出插件