通知渠道增加Discord

This commit is contained in:
HankunYu
2025-12-22 02:15:28 +00:00
parent 91fc4327f4
commit 4bd08bd915
3 changed files with 280 additions and 0 deletions

View File

@@ -0,0 +1,131 @@
from typing import Optional, Union, List, Tuple, Any
from app.core.context import MediaInfo, Context
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.discord.discord import Discord
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
class DiscordModule(_ModuleBase, _MessageBase[Discord]):
def init_module(self) -> None:
"""
初始化模块
"""
super().init_service(service_name=Discord.__name__.lower(),
service_type=Discord)
self._channel = MessageChannel.Discord
@staticmethod
def get_name() -> str:
return "Discord"
@staticmethod
def get_type() -> ModuleType:
"""
获取模块类型
"""
return ModuleType.Notification
@staticmethod
def get_subtype() -> MessageChannel:
"""
获取模块子类型
"""
return MessageChannel.Discord
@staticmethod
def get_priority() -> int:
"""
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
"""
return 4
def stop(self):
"""
停止模块
"""
for client in self.get_instances().values():
client.stop()
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self.get_instances():
return None
for name, client in self.get_instances().items():
state = client.get_state()
if not state:
return False, f"Discord {name} webhook URL 未配置"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def message_parser(self, source: str, body: Any, form: Any, args: Any) -> Optional[CommingMessage]:
"""
解析消息内容,返回字典,注意以下约定值:
userid: 用户ID
username: 用户名
text: 内容
:param source: 消息来源
:param body: 请求体
:param form: 表单
:param args: 参数
:return: 渠道、消息体
"""
# Discord 模块暂时不支持接收消息
return None
def post_message(self, message: Notification, **kwargs) -> None:
"""
发送通知消息
:param message: 消息通知对象
"""
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
client: Discord = self.get_instance(conf.name)
if client:
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=message.userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
发送媒体信息选择列表
:param message: 消息体
:param medias: 媒体信息
:return: 成功或失败
"""
logger.warn("Discord webhooks 不支持")
return None
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
发送种子信息选择列表
:param message: 消息体
:param torrents: 种子信息
:return: 成功或失败
"""
logger.warn("Discord webhooks 不支持")
return False
def delete_message(self, channel: MessageChannel, source: str,
message_id: str, chat_id: Optional[str] = None) -> bool:
"""
删除消息
:param channel: 消息渠道
:param source: 指定的消息源
:param message_id: 消息IDSlack中为时间戳
:param chat_id: 聊天ID频道ID
:return: 删除是否成功
"""
logger.warn("Discord webhooks 不支持")
return False

View File

@@ -0,0 +1,148 @@
import re
from typing import Optional, List, Dict
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.helper.image import ImageHelper
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
class Discord:
"""
Discord Webhook通知实现
"""
_webhook_url: Optional[str] = None
_username: Optional[str] = None
_avatar_url: Optional[str] = None
def __init__(self, DISCORD_WEBHOOK_URL: Optional[str] = None,
DISCORD_USERNAME: Optional[str] = None,
DISCORD_AVATAR_URL: Optional[str] = None, **kwargs):
"""
初始化Discord webhook客户端
:param DISCORD_WEBHOOK_URL: Discord webhook URL
:param DISCORD_USERNAME: 自定义webhook消息的用户名
:param DISCORD_AVATAR_URL: 自定义webhook消息的头像URL
"""
if not DISCORD_WEBHOOK_URL:
logger.error("Discord webhook URL未配置")
return
self._webhook_url = DISCORD_WEBHOOK_URL
self._username = DISCORD_USERNAME or "MoviePilot"
self._avatar_url = DISCORD_AVATAR_URL
def get_state(self) -> bool:
"""
获取服务状态
:return: Webhook URL已配置则返回True
"""
return self._webhook_url is not None
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None,
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
通过webhook发送Discord消息
:param title: 消息标题
:param text: 消息内容
:param image: 消息图片URL
:param userid: 用户IDwebhook不使用
:param link: 跳转链接
:param buttons: 按钮列表基础webhook不支持
:param original_message_id: 原消息ID不支持编辑
:param original_chat_id: 原聊天ID不支持编辑
:return: 成功或失败
"""
if not self._webhook_url:
return None
if not title and not text:
logger.warn("标题和内容不能同时为空")
return False
try:
# 解析消息内容,构建 fields 数组
fields = []
converted_text = ' '
if text:
# 按逗号分割消息内容
lines = text.splitlines()
# 遍历每行内容
for line in lines:
# 将每行内容按冒号分割为字段名称和值
if '' not in line:
converted_text = line
else:
name, value = line.split('', 1)
# 创建一个字典表示一个 field
field = {
"name": name.strip(),
"value": value.strip(),
"inline": False
}
# 将 field 添加到 fields 列表中
fields.append(field)
# 构建 embed
embed = {
"title": title,
"url": link if link else "https://github.com/jxxghp/MoviePilot",
"color": 15258703,
"description": converted_text if converted_text else text,
"fields": fields
}
# 添加图片
if image:
# 获取并验证图片
image_content = ImageHelper().fetch_image(image)
if image_content:
embed["image"] = {
"url": image
}
else:
logger.warn(f"获取图片失败: {image},将不带图片发送")
# 构建payload
payload = {
"username": self._username,
"embeds": [embed]
}
# 添加自定义头像
if self._avatar_url:
payload["avatar_url"] = self._avatar_url
# 发送webhook请求
response = RequestUtils(
timeout=10,
content_type="application/json"
).post_res(
url=self._webhook_url,
json=payload
)
if response and response.status_code == 204:
# logger.info("Discord消息发送成功")
return True
else:
logger.error(f"Discord消息发送失败: {response.status_code if response else 'No response'}")
return False
except Exception as e:
logger.error(f"发送Discord消息时出现异常: {str(e)}")
return False
def stop(self):
"""
停止Discord服务webhook无需清理
"""
pass