mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-02 18:22:39 +08:00
786 lines
27 KiB
Python
786 lines
27 KiB
Python
from __future__ import annotations
|
||
|
||
import ast
|
||
import json
|
||
import queue
|
||
import re
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Any, Literal, Optional, List, Dict, Union
|
||
from typing import Callable
|
||
|
||
from jinja2 import Template
|
||
|
||
from app.core.cache import TTLCache
|
||
from app.core.config import global_vars
|
||
from app.core.context import MediaInfo, TorrentInfo
|
||
from app.core.meta import MetaBase
|
||
from app.db.systemconfig_oper import SystemConfigOper
|
||
from app.log import logger
|
||
from app.schemas.message import Notification
|
||
from app.schemas.tmdb import TmdbEpisode
|
||
from app.schemas.transfer import TransferInfo
|
||
from app.schemas.types import SystemConfigKey
|
||
from app.utils.singleton import Singleton, SingletonClass
|
||
from app.utils.string import StringUtils
|
||
|
||
|
||
class TemplateContextBuilder:
|
||
"""
|
||
模板上下文构建器
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._context = {}
|
||
|
||
def build(
|
||
self,
|
||
meta: Optional[MetaBase] = None,
|
||
mediainfo: Optional[MediaInfo] = None,
|
||
torrentinfo: Optional[TorrentInfo] = None,
|
||
transferinfo: Optional[TransferInfo] = None,
|
||
file_extension: Optional[str] = None,
|
||
episodes_info: Optional[List[TmdbEpisode]] = None,
|
||
include_raw_objects: bool = True,
|
||
**kwargs
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
:param meta: 媒体信息
|
||
:param mediainfo: 媒体信息
|
||
:param torrentinfo: 种子信息
|
||
:param transferinfo: 传输信息
|
||
:param file_extension: 文件扩展名
|
||
:param episodes_info: 剧集信息
|
||
:param include_raw_objects: 是否包含原始对象
|
||
:return: 渲染上下文字典
|
||
"""
|
||
self._context.clear()
|
||
self._add_episode_details(meta, episodes_info)
|
||
self._add_media_info(mediainfo)
|
||
self._add_transfer_info(transferinfo)
|
||
self._add_torrent_info(torrentinfo)
|
||
self._add_file_info(file_extension)
|
||
if kwargs:
|
||
self._context.update(kwargs)
|
||
|
||
if include_raw_objects:
|
||
self._add_raw_objects(meta, mediainfo, torrentinfo, transferinfo, episodes_info)
|
||
|
||
# 移除空值
|
||
return {k: v for k, v in self._context.items() if v is not None}
|
||
|
||
def _add_media_info(self, mediainfo: MediaInfo):
|
||
"""
|
||
增加媒体信息
|
||
"""
|
||
if not mediainfo:
|
||
return
|
||
season_fmt = f"S{mediainfo.season:02d}" if mediainfo.season is not None else None
|
||
base_info = {
|
||
# 标题
|
||
"title": self.__convert_invalid_characters(mediainfo.title),
|
||
# 英文标题
|
||
"en_title": self.__convert_invalid_characters(mediainfo.en_title),
|
||
# 原语种标题
|
||
"original_title": self.__convert_invalid_characters(mediainfo.original_title),
|
||
# 季号
|
||
"season": self._context.get("season") or mediainfo.season,
|
||
# Sxx
|
||
"season_fmt": self._context.get("season_fmt") or season_fmt,
|
||
# 年份
|
||
"year": mediainfo.year or self._context.get("year"),
|
||
# 媒体标题 + 年份
|
||
"title_year": mediainfo.title_year or self._context.get("title_year"),
|
||
}
|
||
|
||
_meta_season = self._context.get("season")
|
||
media_info = {
|
||
# 类型
|
||
"type": mediainfo.type.value,
|
||
# 类别
|
||
"category": mediainfo.category,
|
||
# 评分
|
||
"vote_average": mediainfo.vote_average,
|
||
# 海报
|
||
"poster": mediainfo.get_poster_image(),
|
||
# 背景图
|
||
"backdrop": mediainfo.get_backdrop_image(),
|
||
# 季年份根据season值获取
|
||
"season_year": mediainfo.season_years.get(
|
||
int(_meta_season),
|
||
None) if (mediainfo.season_years and _meta_season) else None,
|
||
# 演员
|
||
"actors": '、 '.join([actor['name'] for actor in mediainfo.actors[:5]]),
|
||
# 简介
|
||
"overview": mediainfo.overview,
|
||
# TMDBID
|
||
"tmdbid": mediainfo.tmdb_id,
|
||
# IMDBID
|
||
"imdbid": mediainfo.imdb_id,
|
||
# 豆瓣ID
|
||
"doubanid": mediainfo.douban_id,
|
||
}
|
||
self._context.update({**base_info, **media_info})
|
||
|
||
def _add_episode_details(self, meta: Optional[MetaBase], episodes: Optional[List[TmdbEpisode]]):
|
||
"""
|
||
添加剧集详细信息
|
||
"""
|
||
if not meta:
|
||
return
|
||
|
||
episode_data = {"episode_title": None, "episode_date": None}
|
||
if meta.begin_episode and episodes:
|
||
for episode in episodes:
|
||
if episode.episode_number == meta.begin_episode:
|
||
episode_data.update({
|
||
"episode_title": self.__convert_invalid_characters(episode.name),
|
||
"episode_date": episode.air_date if episode.air_date else None
|
||
})
|
||
break
|
||
|
||
meta_info = {
|
||
# 原文件名
|
||
"original_name": meta.title,
|
||
# 识别名称(优先使用中文)
|
||
"name": meta.name,
|
||
# 识别的英文名称(可能为空)
|
||
"en_name": meta.en_name,
|
||
# 年份
|
||
"year": meta.year,
|
||
# 名字 + 年份
|
||
"title_year": self._context.get("title_year") or "%s (%s)" % (
|
||
meta.name, meta.year) if meta.year else meta.name,
|
||
# 季号
|
||
"season": meta.season_seq,
|
||
# Sxx
|
||
"season_fmt": meta.season,
|
||
# 集号
|
||
"episode": meta.episode_seqs,
|
||
# 季集 SxxExx
|
||
"season_episode": "%s%s" % (meta.season, meta.episode),
|
||
# 段/节
|
||
"part": meta.part,
|
||
# 自定义占位符
|
||
"customization": meta.customization,
|
||
}
|
||
|
||
tech_metadata = {
|
||
# 资源类型
|
||
"resourceType": meta.resource_type,
|
||
# 特效
|
||
"effect": meta.resource_effect,
|
||
# 版本
|
||
"edition": meta.edition,
|
||
# 分辨率
|
||
"videoFormat": meta.resource_pix,
|
||
# 质量
|
||
"resource_term": meta.resource_term,
|
||
# 制作组/字幕组
|
||
"releaseGroup": meta.resource_team,
|
||
# 视频编码
|
||
"videoCodec": meta.video_encode,
|
||
# 音频编码
|
||
"audioCodec": meta.audio_encode,
|
||
# 流媒体平台
|
||
"webSource": meta.web_source,
|
||
}
|
||
self._context.update({**meta_info, **tech_metadata, **episode_data})
|
||
|
||
def _add_torrent_info(self, torrentinfo: Optional[TorrentInfo]):
|
||
"""
|
||
添加种子信息
|
||
"""
|
||
if not torrentinfo:
|
||
return
|
||
if torrentinfo.size:
|
||
if str(torrentinfo.size).replace(".", "").isdigit():
|
||
size = StringUtils.str_filesize(torrentinfo.size)
|
||
else:
|
||
size = torrentinfo.size
|
||
else:
|
||
size = 0
|
||
|
||
if torrentinfo.description:
|
||
html_re = re.compile(r'<[^>]+>', re.S)
|
||
description = html_re.sub('', torrentinfo.description)
|
||
torrentinfo.description = re.sub(r'<[^>]+>', '', description)
|
||
|
||
torrent_info = {
|
||
# 种子标题
|
||
"torrent_title": torrentinfo.title,
|
||
# 发布时间
|
||
"pubdate": torrentinfo.pubdate,
|
||
# 免费剩余时间
|
||
"freedate": torrentinfo.freedate_diff,
|
||
# 做种数
|
||
"seeders": torrentinfo.seeders,
|
||
# 促销信息
|
||
"volume_factor": torrentinfo.volume_factor,
|
||
# Hit&Run
|
||
"hit_and_run": "是" if torrentinfo.hit_and_run else "否",
|
||
# 种子标签
|
||
"labels": ' '.join(torrentinfo.labels),
|
||
# 描述
|
||
"description": torrentinfo.description,
|
||
# 站点名称
|
||
"site_name": torrentinfo.site_name,
|
||
# 种子大小
|
||
"size": size,
|
||
}
|
||
self._context.update(torrent_info)
|
||
|
||
def _add_transfer_info(self, transferinfo: Optional[TransferInfo]) -> Optional[Dict]:
|
||
"""
|
||
添加文件转移上下文
|
||
"""
|
||
if not transferinfo:
|
||
return None
|
||
ctx = {
|
||
"transfer_type": transferinfo.transfer_type,
|
||
"file_count": transferinfo.file_count,
|
||
"total_size": StringUtils.str_filesize(transferinfo.total_size),
|
||
"err_msg": transferinfo.message,
|
||
}
|
||
return self._context.update(ctx)
|
||
|
||
def _add_file_info(self, file_extension: Optional[str]):
|
||
"""
|
||
添加文件信息
|
||
"""
|
||
if not file_extension:
|
||
return
|
||
file_info = {
|
||
# 文件后缀
|
||
"fileExt": file_extension,
|
||
}
|
||
self._context.update(file_info)
|
||
|
||
def _add_raw_objects(
|
||
self,
|
||
meta: Optional[MetaBase],
|
||
mediainfo: Optional[MediaInfo],
|
||
torrentinfo: Optional[TorrentInfo],
|
||
transferinfo: Optional[TransferInfo],
|
||
episodes_info: Optional[List[TmdbEpisode]],
|
||
):
|
||
"""
|
||
添加原始对象引用
|
||
"""
|
||
raw_objects = {
|
||
# 文件元数据
|
||
"__meta__": meta,
|
||
# 识别的媒体信息
|
||
"__mediainfo__": mediainfo,
|
||
# 种子信息
|
||
"__torrentinfo__": torrentinfo,
|
||
# 文件转移信息
|
||
"__transferinfo__": transferinfo,
|
||
# 当前季的全部集信息
|
||
"__episodes_info__": episodes_info,
|
||
}
|
||
self._context.update(raw_objects)
|
||
|
||
@staticmethod
|
||
def __convert_invalid_characters(filename: str):
|
||
"""
|
||
将不支持的字符转换为全角字符
|
||
"""
|
||
if not filename:
|
||
return filename
|
||
invalid_characters = r'\/:*?"<>|'
|
||
# 创建半角到全角字符的转换表
|
||
halfwidth_chars = "".join([chr(i) for i in range(33, 127)])
|
||
fullwidth_chars = "".join([chr(i + 0xFEE0) for i in range(33, 127)])
|
||
translation_table = str.maketrans(halfwidth_chars, fullwidth_chars)
|
||
# 将不支持的字符替换为对应的全角字符
|
||
for char in invalid_characters:
|
||
filename = filename.replace(char, char.translate(translation_table))
|
||
return filename
|
||
|
||
|
||
class TemplateHelper(metaclass=SingletonClass):
|
||
"""
|
||
模板格式渲染帮助类
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.builder = TemplateContextBuilder()
|
||
self.cache = TTLCache(region="notification", maxsize=100, ttl=600)
|
||
|
||
@staticmethod
|
||
def _generate_cache_key(cuntent: Union[str, dict]) -> str:
|
||
"""
|
||
生成缓存键
|
||
"""
|
||
if isinstance(cuntent, dict):
|
||
base_str = cuntent.get("title", '') + cuntent.get("text", '')
|
||
return StringUtils.md5_hash(json.dumps(base_str, sort_keys=True, ensure_ascii=False))
|
||
|
||
return StringUtils.md5_hash(cuntent)
|
||
|
||
def get_cache_context(self, cuntent: Union[str, dict]) -> Optional[dict]:
|
||
"""
|
||
获取缓存上下文
|
||
"""
|
||
cache_key = self._generate_cache_key(cuntent)
|
||
return self.cache.get(cache_key)
|
||
|
||
def set_cache_context(self, cuntent: Union[str, dict], context: dict) -> None:
|
||
"""
|
||
设置缓存上下文
|
||
"""
|
||
cache_key = self._generate_cache_key(cuntent)
|
||
self.cache[cache_key] = context
|
||
|
||
def render(self,
|
||
template_content: str,
|
||
template_type: Literal['string', 'dict', 'literal'] = "literal",
|
||
**kwargs) -> Optional[Union[str, dict]]:
|
||
"""
|
||
根据模板格式渲染内容
|
||
:param template_content: 模板字符串
|
||
:param template_type: 模板字符串类型(消息通知`literal`, 路径`string`)
|
||
:param kwargs: 补传业务对象
|
||
:raises ValueError: 当模板处理过程中出现错误
|
||
:return: 渲染后的结果
|
||
"""
|
||
try:
|
||
# 解析模板字符
|
||
parsed = self.parse_template_content(template_content, template_type)
|
||
if not parsed:
|
||
raise ValueError("模板解析失败")
|
||
|
||
context = self.builder.build(**kwargs)
|
||
if not context:
|
||
raise ValueError("上下文构建失败")
|
||
|
||
rendered = self.render_with_context(parsed, context)
|
||
if not rendered:
|
||
raise ValueError("模板渲染失败")
|
||
|
||
if rendered := rendered if template_type == 'string' else self.__process_formatted_string(rendered):
|
||
# 缓存上下文
|
||
self.set_cache_context(rendered, context)
|
||
# 返回渲染结果
|
||
return rendered
|
||
return None
|
||
except Exception as e:
|
||
raise ValueError(f"模板处理失败: {str(e)}") from e
|
||
|
||
@staticmethod
|
||
def render_with_context(template_content: str, context: dict) -> str:
|
||
"""
|
||
使用指定上下文渲染 Jinja2 模板字符串
|
||
template_content: Jinja2 模板字符串
|
||
context: 渲染用的上下文数据
|
||
"""
|
||
# 渲染模板
|
||
template = Template(template_content)
|
||
return template.render(context)
|
||
|
||
@staticmethod
|
||
def parse_template_content(template_content: Union[str, dict],
|
||
template_type: Literal['string', 'dict', 'literal'] = None) -> Optional[str]:
|
||
"""
|
||
解析模板字符
|
||
:param template_content 模板格式字符
|
||
:param template_type 模板字符类型
|
||
"""
|
||
|
||
def parse_literal(_template_content: str) -> str:
|
||
"""
|
||
解析Python字面量
|
||
"""
|
||
try:
|
||
template_dict = ast.literal_eval(_template_content) if isinstance(_template_content,
|
||
str) else _template_content
|
||
if not isinstance(template_dict, dict):
|
||
raise ValueError("解析结果必须是一个字典")
|
||
return json.dumps(template_dict, ensure_ascii=False)
|
||
except (ValueError, SyntaxError) as err:
|
||
raise ValueError(f"无效的Python字面量格式: {str(err)}")
|
||
|
||
try:
|
||
if template_type:
|
||
parse_map = {
|
||
'string': lambda x: str(x),
|
||
'dict': lambda x: json.dumps(x, ensure_ascii=False),
|
||
'literal': parse_literal
|
||
}
|
||
return parse_map[template_type](template_content)
|
||
|
||
# 自动判断模板类型
|
||
if isinstance(template_content, dict):
|
||
return json.dumps(template_content, ensure_ascii=False)
|
||
elif isinstance(template_content, str):
|
||
try:
|
||
json.loads(template_content)
|
||
return template_content
|
||
except json.JSONDecodeError:
|
||
try:
|
||
return parse_literal(template_content)
|
||
except (ValueError, SyntaxError):
|
||
return template_content
|
||
else:
|
||
raise ValueError(f"不支持的模板类型: {type(template_content)}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"模板解析失败: {str(e)}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def __process_formatted_string(rendered: str) -> Optional[Union[dict, str]]:
|
||
"""
|
||
处理格式化字符串
|
||
保留转义字符
|
||
"""
|
||
|
||
def restore_chars(obj: Any) -> Any:
|
||
"""恢复特殊字符"""
|
||
if isinstance(obj, str):
|
||
return obj.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t').replace('\\b', '\b').replace(
|
||
'\\f', '\f')
|
||
elif isinstance(obj, dict):
|
||
return {k: restore_chars(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [restore_chars(item) for item in obj]
|
||
return obj
|
||
|
||
# 定义特殊字符映射
|
||
|
||
special_chars = {
|
||
'\n': '\\n', # 换行符
|
||
'\r': '\\r', # 回车符
|
||
'\t': '\\t', # 制表符
|
||
'\b': '\\b', # 退格符
|
||
'\f': '\\f', # 换页符
|
||
}
|
||
|
||
# 处理特殊字符
|
||
processed = rendered
|
||
for char, escape in special_chars.items():
|
||
processed = processed.replace(char, escape)
|
||
|
||
# 尝试解析为JSON
|
||
try:
|
||
rendered_dict = json.loads(processed)
|
||
return restore_chars(rendered_dict)
|
||
except json.JSONDecodeError:
|
||
return rendered
|
||
|
||
def close(self):
|
||
"""
|
||
清理资源
|
||
"""
|
||
if self.cache:
|
||
self.cache.close()
|
||
|
||
|
||
class MessageTemplateHelper:
|
||
"""
|
||
消息模板渲染器
|
||
"""
|
||
|
||
@staticmethod
|
||
def render(message: Notification, *args, **kwargs) -> Optional[Notification]:
|
||
"""
|
||
渲染消息模板
|
||
"""
|
||
if not MessageTemplateHelper.is_instance_valid(message):
|
||
if MessageTemplateHelper.meets_update_conditions(message, *args, **kwargs):
|
||
logger.info("将使用模板渲染消息内容")
|
||
return MessageTemplateHelper._apply_template_data(message, *args, **kwargs)
|
||
return message
|
||
|
||
@staticmethod
|
||
def is_instance_valid(message: Notification) -> bool:
|
||
"""
|
||
检查消息是否有效
|
||
"""
|
||
if isinstance(message, Notification):
|
||
return bool(message.title or message.text)
|
||
return False
|
||
|
||
@staticmethod
|
||
def meets_update_conditions(message: Notification, *args, **kwargs) -> bool:
|
||
"""
|
||
判断是否满足消息实例更新条件
|
||
|
||
满足条件需同时具备:
|
||
1. 消息为有效Notification实例
|
||
2. 消息指定了模板类型(ctype)
|
||
3. 存在待渲染的模板变量数据
|
||
"""
|
||
if isinstance(message, Notification):
|
||
return True if message.ctype and (args or kwargs) else False
|
||
return False
|
||
|
||
@staticmethod
|
||
def _apply_template_data(message: Notification, *args, **kwargs) -> Optional[Notification]:
|
||
"""
|
||
更新消息实例
|
||
"""
|
||
try:
|
||
if template := MessageTemplateHelper._get_template(message):
|
||
rendered = TemplateHelper().render(template_content=template, *args, **kwargs)
|
||
for key, value in rendered.items():
|
||
if hasattr(message, key):
|
||
setattr(message, key, value)
|
||
return message
|
||
except Exception as e:
|
||
logger.error(f"更新Notification时出现错误:{str(e)}")
|
||
return message
|
||
|
||
@staticmethod
|
||
def _get_template(message: Notification) -> Optional[str]:
|
||
"""
|
||
获取消息模板
|
||
"""
|
||
template_dict: dict[str, str] = SystemConfigOper().get(SystemConfigKey.NotificationTemplates)
|
||
return template_dict.get(message.ctype.value)
|
||
|
||
|
||
class MessageQueueManager(metaclass=SingletonClass):
|
||
"""
|
||
消息发送队列管理器
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
send_callback: Optional[Callable] = None,
|
||
check_interval: Optional[int] = 10
|
||
) -> None:
|
||
"""
|
||
消息队列管理器初始化
|
||
|
||
:param send_callback: 实际发送消息的回调函数
|
||
:param check_interval: 时间检查间隔(秒)
|
||
"""
|
||
self.schedule_periods: List[tuple[int, int, int, int]] = []
|
||
|
||
self.init_config()
|
||
|
||
self.queue: queue.Queue[Any] = queue.Queue()
|
||
self.send_callback = send_callback
|
||
self.check_interval = check_interval
|
||
|
||
self._running = True
|
||
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
||
self.thread.start()
|
||
|
||
def init_config(self):
|
||
"""
|
||
初始化配置
|
||
"""
|
||
self.schedule_periods = self._parse_schedule(
|
||
SystemConfigOper().get(SystemConfigKey.NotificationSendTime)
|
||
)
|
||
|
||
@staticmethod
|
||
def _parse_schedule(periods: Union[list, dict]) -> List[tuple[int, int, int, int]]:
|
||
"""
|
||
将字符串时间格式转换为分钟数元组
|
||
支持格式为 'HH:MM' 或 'HH:MM:SS' 的时间字符串
|
||
"""
|
||
parsed = []
|
||
if not periods:
|
||
return parsed
|
||
if not isinstance(periods, list):
|
||
periods = [periods]
|
||
for period in periods:
|
||
if not period:
|
||
continue
|
||
if not period.get('start') or not period.get('end'):
|
||
continue
|
||
try:
|
||
# 处理 start 时间
|
||
start_parts = period['start'].split(':')
|
||
if len(start_parts) == 2:
|
||
start_h, start_m = map(int, start_parts)
|
||
elif len(start_parts) >= 3:
|
||
start_h, start_m = map(int, start_parts[:2]) # 只取前两个部分 (HH:MM)
|
||
else:
|
||
continue
|
||
# 处理 end 时间
|
||
end_parts = period['end'].split(':')
|
||
if len(end_parts) == 2:
|
||
end_h, end_m = map(int, end_parts)
|
||
elif len(end_parts) >= 3:
|
||
end_h, end_m = map(int, end_parts[:2]) # 只取前两个部分 (HH:MM)
|
||
else:
|
||
continue
|
||
|
||
parsed.append((start_h, start_m, end_h, end_m))
|
||
except ValueError as e:
|
||
logger.error(f"解析时间周期时出现错误:{period}. 错误:{str(e)}. 跳过此周期。")
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"解析时间周期时出现意外错误:{period}. 错误:{str(e)}. 跳过此周期。")
|
||
continue
|
||
return parsed
|
||
|
||
@staticmethod
|
||
def _time_to_minutes(time_str: str) -> int:
|
||
"""
|
||
将 'HH:MM' 格式转换为分钟数
|
||
"""
|
||
hours, minutes = map(int, time_str.split(':'))
|
||
return hours * 60 + minutes
|
||
|
||
def _is_in_scheduled_time(self, current_time: datetime) -> bool:
|
||
"""
|
||
检查当前时间是否在允许发送的时间段内
|
||
"""
|
||
if not self.schedule_periods:
|
||
return True
|
||
current_minutes = current_time.hour * 60 + current_time.minute
|
||
for period in self.schedule_periods:
|
||
s_h, s_m, e_h, e_m = period
|
||
start = s_h * 60 + s_m
|
||
end = e_h * 60 + e_m
|
||
|
||
if start <= end:
|
||
if start <= current_minutes <= end:
|
||
return True
|
||
else:
|
||
if current_minutes >= start or current_minutes <= end:
|
||
return True
|
||
return False
|
||
|
||
def send_message(self, *args, **kwargs) -> None:
|
||
"""
|
||
发送消息(立即发送或加入队列)
|
||
"""
|
||
immediately = kwargs.pop("immediately", False)
|
||
if immediately or self._is_in_scheduled_time(datetime.now()):
|
||
self._send(*args, **kwargs)
|
||
else:
|
||
self.queue.put({
|
||
"args": args,
|
||
"kwargs": kwargs
|
||
})
|
||
logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}")
|
||
|
||
async def async_send_message(self, *args, **kwargs) -> None:
|
||
"""
|
||
异步发送消息(直接加入队列)
|
||
"""
|
||
kwargs.pop("immediately", False)
|
||
self.queue.put({
|
||
"args": args,
|
||
"kwargs": kwargs
|
||
})
|
||
logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}")
|
||
|
||
def _send(self, *args, **kwargs) -> None:
|
||
"""
|
||
实际发送消息(可通过回调函数自定义)
|
||
"""
|
||
if self.send_callback:
|
||
try:
|
||
logger.info(f"发送消息:{kwargs}")
|
||
self.send_callback(*args, **kwargs)
|
||
except Exception as e:
|
||
logger.error(f"发送消息错误:{str(e)}")
|
||
|
||
def _monitor_loop(self) -> None:
|
||
"""
|
||
后台线程循环检查时间并处理队列
|
||
"""
|
||
while self._running:
|
||
current_time = datetime.now()
|
||
if self._is_in_scheduled_time(current_time):
|
||
while not self.queue.empty():
|
||
if global_vars.is_system_stopped:
|
||
break
|
||
if not self._is_in_scheduled_time(datetime.now()):
|
||
break
|
||
try:
|
||
message = self.queue.get_nowait()
|
||
self._send(*message['args'], **message['kwargs'])
|
||
logger.info(f"队列剩余消息:{self.queue.qsize()}")
|
||
except queue.Empty:
|
||
break
|
||
time.sleep(self.check_interval)
|
||
|
||
def stop(self) -> None:
|
||
"""
|
||
停止队列管理器
|
||
"""
|
||
self._running = False
|
||
logger.info("正在停止消息队列...")
|
||
self.thread.join()
|
||
logger.info("消息队列已停止")
|
||
|
||
|
||
class MessageHelper(metaclass=Singleton):
|
||
"""
|
||
消息队列管理器,包括系统消息和用户消息
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.sys_queue = queue.Queue()
|
||
self.user_queue = queue.Queue()
|
||
|
||
def put(self, message: Any, role: str = "plugin", title: str = None, note: Union[list, dict] = None):
|
||
"""
|
||
存消息
|
||
:param message: 消息
|
||
:param role: 消息通道 systm:系统消息,plugin:插件消息,user:用户消息
|
||
:param title: 标题
|
||
:param note: 附件json
|
||
"""
|
||
if role in ["system", "plugin"]:
|
||
# 没有标题时获取插件名称
|
||
if role == "plugin" and not title:
|
||
title = "插件通知"
|
||
# 系统通知,默认
|
||
self.sys_queue.put(json.dumps({
|
||
"type": role,
|
||
"title": title,
|
||
"text": message,
|
||
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||
"note": note
|
||
}))
|
||
else:
|
||
if isinstance(message, str):
|
||
# 非系统的文本通知
|
||
self.user_queue.put(json.dumps({
|
||
"title": title,
|
||
"text": message,
|
||
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||
"note": note
|
||
}))
|
||
elif hasattr(message, "to_dict"):
|
||
# 非系统的复杂结构通知,如媒体信息/种子列表等。
|
||
content = message.to_dict()
|
||
content['title'] = title
|
||
content['date'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
content['note'] = note
|
||
self.user_queue.put(json.dumps(content))
|
||
|
||
def get(self, role: str = "system") -> Optional[str]:
|
||
"""
|
||
取消息
|
||
:param role: 消息通道 systm:系统消息,plugin:插件消息,user:用户消息
|
||
"""
|
||
if role == "system":
|
||
if not self.sys_queue.empty():
|
||
return self.sys_queue.get(block=False)
|
||
else:
|
||
if not self.user_queue.empty():
|
||
return self.user_queue.get(block=False)
|
||
return None
|
||
|
||
|
||
def stop_message():
|
||
"""
|
||
停止消息服务
|
||
"""
|
||
# 停止消息队列
|
||
MessageQueueManager().stop()
|
||
# 关闭消息演染器
|
||
TemplateHelper().close()
|