From 39756512ae7921929c3a6c2503cd536f7f712562 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 9 Mar 2025 19:34:05 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=8F=91=E9=80=81=E6=97=B6=E9=97=B4=E8=8C=83=E5=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/__init__.py | 13 +++-- app/helper/message.py | 126 +++++++++++++++++++++++++++++++++++++++++- app/schemas/types.py | 2 + 3 files changed, 135 insertions(+), 6 deletions(-) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index d3814d15..fb7bfc2e 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -16,7 +16,7 @@ from app.core.meta import MetaBase from app.core.module import ModuleManager from app.db.message_oper import MessageOper from app.db.user_oper import UserOper -from app.helper.message import MessageHelper +from app.helper.message import MessageHelper, MessageQueueManager from app.helper.service import ServiceConfigHelper from app.log import logger from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \ @@ -38,6 +38,9 @@ class ChainBase(metaclass=ABCMeta): self.eventmanager = EventManager() self.messageoper = MessageOper() self.messagehelper = MessageHelper() + self.messagequeue = MessageQueueManager( + send_callback=self.run_module + ) self.useroper = UserOper() @staticmethod @@ -545,13 +548,13 @@ class ChainBase(metaclass=ABCMeta): # 按设定发送 self.eventmanager.send_event(etype=EventType.NoticeMessage, data={**send_message.dict(), "type": send_message.mtype}) - self.run_module("post_message", message=send_message) + self.messagequeue.send_message("post_message", message=send_message) if not send_orignal: return # 发送消息事件 self.eventmanager.send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype}) # 按原消息发送 - self.run_module("post_message", message=message) + self.messagequeue.send_message("post_message", message=message) def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: """ @@ -563,7 +566,7 @@ class ChainBase(metaclass=ABCMeta): note_list = [media.to_dict() for media in medias] self.messagehelper.put(message, role="user", note=note_list, title=message.title) self.messageoper.add(**message.dict(), note=note_list) - return self.run_module("post_medias_message", message=message, medias=medias) + return self.messagequeue.send_message("post_medias_message", message=message, medias=medias) def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None: """ @@ -575,7 +578,7 @@ class ChainBase(metaclass=ABCMeta): note_list = [torrent.torrent_info.to_dict() for torrent in torrents] self.messagehelper.put(message, role="user", note=note_list, title=message.title) self.messageoper.add(**message.dict(), note=note_list) - return self.run_module("post_torrents_message", message=message, torrents=torrents) + return self.messagequeue.send_message("post_torrents_message", message=message, torrents=torrents) def metadata_img(self, mediainfo: MediaInfo, season: int = None, episode: int = None) -> Optional[dict]: """ diff --git a/app/helper/message.py b/app/helper/message.py index 3cd0c2da..5f2ac682 100644 --- a/app/helper/message.py +++ b/app/helper/message.py @@ -1,9 +1,133 @@ +from __future__ import annotations + import json import queue +import threading import time -from typing import Optional, Any, Union +from datetime import datetime +from typing import Any, Union +from typing import List, Dict, Optional, Callable from app.utils.singleton import Singleton +from core.config import global_vars +from db.systemconfig_oper import SystemConfigOper +from log import logger +from schemas.types import SystemConfigKey + + +class MessageQueueManager(metaclass=Singleton): + """ + 消息发送队列管理器 + """ + def __init__( + self, + send_callback: Optional[Callable] = None, + check_interval: int = 10 + ) -> None: + """ + 消息队列管理器初始化 + + :param send_callback: 实际发送消息的回调函数 + :param check_interval: 时间检查间隔(秒) + """ + self.schedule_periods = self._parse_schedule( + SystemConfigOper().get(SystemConfigKey.NotificationSendTime) + ) + self.queue: queue.Queue[Any] = queue.Queue() + self.send_callback = send_callback + self.check_interval = check_interval + + self._running = True + self.thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.thread.start() + + @staticmethod + def _parse_schedule(periods: List[Dict[str, str]]) -> List[tuple[int, int, int, int]]: + """ + 将字符串时间格式转换为分钟数元组 + """ + parsed = [] + if not periods: + return parsed + for period in periods: + start_h, start_m = map(int, period['start'].split(':')) + end_h, end_m = map(int, period['end'].split(':')) + parsed.append((start_h, start_m, end_h, end_m)) + return parsed + + @staticmethod + def _time_to_minutes(time_str: str) -> int: + """ + 将 'HH:MM' 格式转换为分钟数 + """ + hours, minutes = map(int, time_str.split(':')) + return hours * 60 + minutes + + def _is_in_scheduled_time(self, current_time: datetime) -> bool: + """ + 检查当前时间是否在允许发送的时间段内 + """ + if not self.schedule_periods: + return True + current_minutes = current_time.hour * 60 + current_time.minute + for period in self.schedule_periods: + s_h, s_m, e_h, e_m = period + start = s_h * 60 + s_m + end = e_h * 60 + e_m + + if start <= end: + if start <= current_minutes <= end: + return True + else: + if current_minutes >= start or current_minutes <= end: + return True + return False + + def send_message(self, *args, **kwargs) -> None: + """ + 发送消息(立即发送或加入队列) + """ + if self._is_in_scheduled_time(datetime.now()): + self._send(*args, **kwargs) + else: + self.queue.put({ + "args": args, + "kwargs": kwargs + }) + logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}") + + def _send(self, *args, **kwargs) -> None: + """ + 实际发送消息(可通过回调函数自定义) + """ + if self.send_callback: + try: + self.send_callback(*args, **kwargs) + except Exception as e: + logger.error(str(e)) + + def _monitor_loop(self) -> None: + """后台线程循环检查时间并处理队列""" + while self._running: + current_time = datetime.now() + if self._is_in_scheduled_time(current_time): + while not self.queue.empty(): + if global_vars.is_system_stopped: + break + if not self._is_in_scheduled_time(datetime.now()): + break + try: + message = self.queue.get_nowait() + self._send(*message['args'], **message['kwargs']) + logger.info(f"队列剩余消息:{self.queue.qsize()}") + except queue.Empty: + break + time.sleep(self.check_interval) + + def stop(self) -> None: + """停止队列管理器""" + self._running = False + self.thread.join() class MessageHelper(metaclass=Singleton): diff --git a/app/schemas/types.py b/app/schemas/types.py index f177f8e7..7e4e5805 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -147,6 +147,8 @@ class SystemConfigKey(Enum): UserSiteAuthParams = "UserSiteAuthParams" # Follow订阅分享者 FollowSubscribers = "FollowSubscribers" + # 通知发送时间 + NotificationSendTime = "NotificationSendTime" # 处理进度Key字典