Files
MoviePilot/app/modules/discord/discord.py
2026-01-12 00:46:26 +00:00

608 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import re
import threading
from typing import Optional, List, Dict, Any, Tuple, Union
import discord
from discord import app_commands
import httpx
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.log import logger
from app.schemas.types import NotificationType
from app.utils.string import StringUtils
# Discord embed 字段解析白名单
# 只有这些消息类型会使用复杂的字段解析逻辑
PARSE_FIELD_TYPES = {
NotificationType.Download, # 资源下载
NotificationType.Organize, # 整理入库
NotificationType.Subscribe, # 订阅
NotificationType.Manual, # 手动处理
}
class Discord:
"""
Discord Bot 通知与交互实现(基于 discord.py 2.6.4
"""
def __init__(self, DISCORD_BOT_TOKEN: Optional[str] = None,
DISCORD_GUILD_ID: Optional[Union[str, int]] = None,
DISCORD_CHANNEL_ID: Optional[Union[str, int]] = None,
**kwargs):
if not DISCORD_BOT_TOKEN:
logger.error("Discord Bot Token 未配置!")
return
self._token = DISCORD_BOT_TOKEN
self._guild_id = self._to_int(DISCORD_GUILD_ID)
self._channel_id = self._to_int(DISCORD_CHANNEL_ID)
base_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message/"
self._ds_url = f"{base_ds_url}?token={settings.API_TOKEN}"
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
intents = discord.Intents.default()
intents.message_content = True
intents.messages = True
intents.guilds = True
self._client: Optional[discord.Client] = discord.Client(
intents=intents,
proxy=settings.PROXY_HOST
)
self._tree: Optional[app_commands.CommandTree] = None
self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self._thread: Optional[threading.Thread] = None
self._ready_event = threading.Event()
self._user_dm_cache: Dict[str, discord.DMChannel] = {}
self._broadcast_channel = None
self._bot_user_id: Optional[int] = None
self._register_events()
self._start()
@staticmethod
def _to_int(val: Optional[Union[str, int]]) -> Optional[int]:
try:
return int(val) if val is not None and str(val).strip() else None
except ValueError:
return None
def _register_events(self):
@self._client.event
async def on_ready():
self._bot_user_id = self._client.user.id if self._client.user else None
self._ready_event.set()
logger.info(f"Discord Bot 已登录:{self._client.user}")
@self._client.event
async def on_message(message: discord.Message):
if message.author.bot:
return
if not self._should_process_message(message):
return
cleaned_text = self._clean_bot_mention(message.content or "")
username = message.author.display_name or message.author.global_name or message.author.name
payload = {
"type": "message",
"userid": str(message.author.id),
"username": username,
"user_tag": str(message.author),
"text": cleaned_text,
"message_id": str(message.id),
"chat_id": str(message.channel.id),
"channel_type": "dm" if isinstance(message.channel, discord.DMChannel) else "guild"
}
await self._post_to_ds(payload)
@self._client.event
async def on_interaction(interaction: discord.Interaction):
if interaction.type == discord.InteractionType.component:
data = interaction.data or {}
callback_data = data.get("custom_id")
if not callback_data:
return
try:
await interaction.response.defer(ephemeral=True)
except Exception as e:
logger.error(f"处理 Discord 交互响应失败:{e}")
username = (interaction.user.display_name or interaction.user.global_name or interaction.user.name) \
if interaction.user else None
payload = {
"type": "interaction",
"userid": str(interaction.user.id) if interaction.user else None,
"username": username,
"user_tag": str(interaction.user) if interaction.user else None,
"callback_data": callback_data,
"message_id": str(interaction.message.id) if interaction.message else None,
"chat_id": str(interaction.channel.id) if interaction.channel else None
}
await self._post_to_ds(payload)
def _start(self):
if self._thread:
return
def runner():
asyncio.set_event_loop(self._loop)
try:
self._loop.create_task(self._client.start(self._token))
self._loop.run_forever()
except Exception as err:
logger.error(f"Discord Bot 启动失败:{err}")
finally:
try:
self._loop.run_until_complete(self._client.close())
except Exception as err:
logger.debug(f"Discord Bot 关闭失败:{err}")
self._thread = threading.Thread(target=runner, daemon=True)
self._thread.start()
def stop(self):
if not self._client or not self._loop or not self._thread:
return
try:
asyncio.run_coroutine_threadsafe(self._client.close(), self._loop).result(timeout=10)
except Exception as err:
logger.error(f"关闭 Discord Bot 失败:{err}")
finally:
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception as err:
logger.error(f"停止 Discord 事件循环失败:{err}")
self._ready_event.clear()
def get_state(self) -> bool:
return self._ready_event.is_set() and self._client is not None
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None,
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[Union[int, str]] = None,
original_chat_id: Optional[str] = None,
mtype: Optional['NotificationType'] = None) -> Optional[bool]:
if not self.get_state():
return False
if not title and not text:
logger.warn("标题和内容不能同时为空")
return False
try:
future = asyncio.run_coroutine_threadsafe(
self._send_message(title=title, text=text, image=image, userid=userid,
link=link, buttons=buttons,
original_message_id=original_message_id,
original_chat_id=original_chat_id,
mtype=mtype),
self._loop)
return future.result(timeout=30)
except Exception as err:
logger.error(f"发送 Discord 消息失败:{err}")
return False
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[Union[int, str]] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
if not self.get_state() or not medias:
return False
title = title or "媒体列表"
try:
future = asyncio.run_coroutine_threadsafe(
self._send_list_message(
embeds=self._build_media_embeds(medias, title),
userid=userid,
buttons=self._build_default_buttons(len(medias)) if not buttons else buttons,
fallback_buttons=buttons,
original_message_id=original_message_id,
original_chat_id=original_chat_id
),
self._loop
)
return future.result(timeout=30)
except Exception as err:
logger.error(f"发送 Discord 媒体列表失败:{err}")
return False
def send_torrents_msg(self, torrents: List[Context], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[Union[int, str]] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
if not self.get_state() or not torrents:
return False
title = title or "种子列表"
try:
future = asyncio.run_coroutine_threadsafe(
self._send_list_message(
embeds=self._build_torrent_embeds(torrents, title),
userid=userid,
buttons=self._build_default_buttons(len(torrents)) if not buttons else buttons,
fallback_buttons=buttons,
original_message_id=original_message_id,
original_chat_id=original_chat_id
),
self._loop
)
return future.result(timeout=30)
except Exception as err:
logger.error(f"发送 Discord 种子列表失败:{err}")
return False
def delete_msg(self, message_id: Union[str, int], chat_id: Optional[str] = None) -> Optional[bool]:
if not self.get_state():
return False
try:
future = asyncio.run_coroutine_threadsafe(
self._delete_message(message_id=message_id, chat_id=chat_id),
self._loop
)
return future.result(timeout=15)
except Exception as err:
logger.error(f"删除 Discord 消息失败:{err}")
return False
async def _send_message(self, title: str, text: Optional[str], image: Optional[str],
userid: Optional[str], link: Optional[str],
buttons: Optional[List[List[dict]]],
original_message_id: Optional[Union[int, str]],
original_chat_id: Optional[str],
mtype: Optional['NotificationType'] = None) -> bool:
channel = await self._resolve_channel(userid=userid, chat_id=original_chat_id)
if not channel:
logger.error("未找到可用的 Discord 频道或私聊")
return False
embed = self._build_embed(title=title, text=text, image=image, link=link, mtype=mtype)
view = self._build_view(buttons=buttons, link=link)
content = None
if original_message_id and original_chat_id:
return await self._edit_message(chat_id=original_chat_id, message_id=original_message_id,
content=content, embed=embed, view=view)
await channel.send(content=content, embed=embed, view=view)
return True
async def _send_list_message(self, embeds: List[discord.Embed],
userid: Optional[str],
buttons: Optional[List[List[dict]]],
fallback_buttons: Optional[List[List[dict]]],
original_message_id: Optional[Union[int, str]],
original_chat_id: Optional[str]) -> bool:
channel = await self._resolve_channel(userid=userid, chat_id=original_chat_id)
if not channel:
logger.error("未找到可用的 Discord 频道或私聊")
return False
view = self._build_view(buttons=buttons if buttons else fallback_buttons)
embeds = embeds[:10] if embeds else [] # Discord 单条消息最多 10 个 embed
if original_message_id and original_chat_id:
return await self._edit_message(chat_id=original_chat_id, message_id=original_message_id,
content=None, embed=None, view=view, embeds=embeds)
await channel.send(embed=embeds[0] if len(embeds) == 1 else None,
embeds=embeds if len(embeds) > 1 else None,
view=view)
return True
async def _edit_message(self, chat_id: Union[str, int], message_id: Union[str, int],
content: Optional[str], embed: Optional[discord.Embed],
view: Optional[discord.ui.View], embeds: Optional[List[discord.Embed]] = None) -> bool:
channel = await self._resolve_channel(chat_id=str(chat_id))
if not channel:
logger.error(f"未找到要编辑的 Discord 频道:{chat_id}")
return False
try:
message = await channel.fetch_message(int(message_id))
kwargs: Dict[str, Any] = {"content": content, "view": view}
if embeds:
if len(embeds) == 1:
kwargs["embed"] = embeds[0]
else:
kwargs["embeds"] = embeds
elif embed:
kwargs["embed"] = embed
await message.edit(**kwargs)
return True
except Exception as err:
logger.error(f"编辑 Discord 消息失败:{err}")
return False
async def _delete_message(self, message_id: Union[str, int], chat_id: Optional[str]) -> bool:
channel = await self._resolve_channel(chat_id=chat_id)
if not channel:
logger.error("删除 Discord 消息时未找到频道")
return False
try:
message = await channel.fetch_message(int(message_id))
await message.delete()
return True
except Exception as err:
logger.error(f"删除 Discord 消息失败:{err}")
return False
@staticmethod
def _build_embed(title: str, text: Optional[str], image: Optional[str],
link: Optional[str], mtype: Optional['NotificationType'] = None) -> discord.Embed:
fields: List[Dict[str, str]] = []
desc_lines: List[str] = []
should_parse_fields = mtype in PARSE_FIELD_TYPES if mtype else False
def _collect_spans(s: str, left: str, right: str) -> List[Tuple[int, int]]:
spans: List[Tuple[int, int]] = []
start = 0
while True:
l_idx = s.find(left, start)
if l_idx == -1:
break
r_idx = s.find(right, l_idx + 1)
if r_idx == -1:
break
spans.append((l_idx, r_idx))
start = r_idx + 1
return spans
def _find_colon_index(s: str, m: re.Match) -> Optional[int]:
segment = s[m.start():m.end()]
for i, ch in enumerate(segment):
if ch in (":", ""):
return m.start() + i
return None
if text:
# 处理上游未反序列化的 "\n" 等转义换行,避免被当成普通字符
if "\\n" in text or "\\r" in text:
text = text.replace("\\r\\n", "\n").replace("\\n", "\n").replace("\\r", "\n")
if not should_parse_fields:
desc_lines.append(text.strip())
else:
# 匹配形如 "字段:值" 的片段,字段名不允许包含常见分隔符;
# 下一个字段需以顿号/逗号/分号等分隔开,且不能是 URL 协议开头,避免值里出现 URL 的":" 被误拆
# 字段名允许 emoji 等 Unicode 字符,但排除空白/分隔符/冒号
name_re = r"[^\s:,。;;、]+"
pair_pattern = re.compile(
rf"({name_re})[:](.*?)(?=(?:[,。;;、]+\s*(?!https?://|ftp://|ftps://|magnet:){name_re}[:])|$)",
re.IGNORECASE,
)
for line in text.splitlines():
line = line.strip()
if not line:
continue
matches = list(pair_pattern.finditer(line))
if matches:
book_spans = _collect_spans(line, "", "") + _collect_spans(line, "", "")
if book_spans:
has_book_colon = False
for m in matches:
colon_idx = _find_colon_index(line, m)
if colon_idx is not None and any(l < colon_idx < r for l, r in book_spans):
has_book_colon = True
break
if has_book_colon:
desc_lines.append(line)
continue
# 若整行只是 URL/时间等自然包含":"的内容,则不当作字段
url_like_names = {"http", "https", "ftp", "ftps", "magnet"}
if all(m.group(1).lower() in url_like_names or m.group(1).isdigit() for m in matches):
desc_lines.append(line)
continue
last_end = 0
for m in matches:
# 追加匹配前的非空文本到描述
prefix = line[last_end:m.start()].strip(" ,;;。、")
# 仅当前缀不全是分隔符/空白时才记录
if prefix and prefix.strip(" ,;;。、"):
desc_lines.append(prefix)
name = m.group(1).strip()
value = m.group(2).strip(" ,;;。、\t") or "-"
if name:
fields.append({"name": name, "value": value, "inline": False})
last_end = m.end()
# 匹配末尾后的文本
suffix = line[last_end:].strip(" ,;;。、")
if suffix and suffix.strip(" ,;;。、"):
desc_lines.append(suffix)
else:
desc_lines.append(line)
description = "\n".join(desc_lines).strip()
if not description and not fields and text:
description = text.strip()
embed = discord.Embed(
title=title,
url=link or "https://github.com/jxxghp/MoviePilot",
description=description if description else None,
color=0xE67E22
)
for field in fields:
embed.add_field(name=field["name"], value=field["value"], inline=False)
if image:
embed.set_image(url=image)
return embed
@staticmethod
def _build_media_embeds(medias: List[MediaInfo], title: str) -> List[discord.Embed]:
embeds: List[discord.Embed] = []
for index, media in enumerate(medias[:10], start=1):
overview = media.get_overview_string(80)
desc_parts = [
f"{media.type.value} | {media.vote_star}" if media.vote_star else media.type.value,
overview
]
embed = discord.Embed(
title=f"{index}. {media.title_year}",
url=media.detail_link or discord.Embed.Empty,
description="\n".join([p for p in desc_parts if p]),
color=0x5865F2
)
if media.get_poster_image():
embed.set_thumbnail(url=media.get_poster_image())
embeds.append(embed)
if embeds:
embeds[0].set_author(name=title)
return embeds
@staticmethod
def _build_torrent_embeds(torrents: List[Context], title: str) -> List[discord.Embed]:
embeds: List[discord.Embed] = []
for index, context in enumerate(torrents[:10], start=1):
torrent = context.torrent_info
meta = MetaInfo(torrent.title, torrent.description)
title_text = f"{meta.season_episode} {meta.resource_term} {meta.video_term} {meta.release_group}"
title_text = re.sub(r"\s+", " ", title_text).strip()
detail = [
f"{torrent.site_name} | {StringUtils.str_filesize(torrent.size)} | {torrent.volume_factor} | {torrent.seeders}",
meta.resource_term,
meta.video_term
]
embed = discord.Embed(
title=f"{index}. {title_text or torrent.title}",
url=torrent.page_url or discord.Embed.Empty,
description="\n".join([d for d in detail if d]),
color=0x00A86B
)
poster = getattr(torrent, "poster", None)
if poster:
embed.set_thumbnail(url=poster)
embeds.append(embed)
if embeds:
embeds[0].set_author(name=title)
return embeds
@staticmethod
def _build_default_buttons(count: int) -> List[List[dict]]:
buttons: List[List[dict]] = []
max_rows = 5
max_per_row = 5
capped = min(count, max_rows * max_per_row)
for idx in range(1, capped + 1):
row_idx = (idx - 1) // max_per_row
if len(buttons) <= row_idx:
buttons.append([])
buttons[row_idx].append({"text": f"选择 {idx}", "callback_data": str(idx)})
if count > capped:
logger.warn(f"按钮数量超过 Discord 限制,仅展示前 {capped}")
return buttons
@staticmethod
def _build_view(buttons: Optional[List[List[dict]]], link: Optional[str] = None) -> Optional[discord.ui.View]:
has_buttons = buttons and any(buttons)
if not has_buttons and not link:
return None
view = discord.ui.View(timeout=None)
if buttons:
for row_index, button_row in enumerate(buttons[:5]):
for button in button_row[:5]:
if "url" in button:
btn = discord.ui.Button(label=button.get("text", "链接"),
url=button["url"],
style=discord.ButtonStyle.link)
else:
custom_id = (button.get("callback_data") or button.get("text") or f"btn-{row_index}")[:99]
btn = discord.ui.Button(label=button.get("text", "选择")[:80],
custom_id=custom_id,
style=discord.ButtonStyle.primary)
view.add_item(btn)
elif link:
view.add_item(discord.ui.Button(label="查看详情", url=link, style=discord.ButtonStyle.link))
return view
async def _resolve_channel(self, userid: Optional[str] = None, chat_id: Optional[str] = None):
# 优先使用明确的聊天 ID
if chat_id:
channel = self._client.get_channel(int(chat_id))
if channel:
return channel
try:
return await self._client.fetch_channel(int(chat_id))
except Exception as err:
logger.warn(f"通过 chat_id 获取 Discord 频道失败:{err}")
# 私聊
if userid:
dm = await self._get_dm_channel(str(userid))
if dm:
return dm
# 配置的广播频道
if self._broadcast_channel:
return self._broadcast_channel
if self._channel_id:
channel = self._client.get_channel(self._channel_id)
if not channel:
try:
channel = await self._client.fetch_channel(self._channel_id)
except Exception as err:
logger.warn(f"通过配置的频道ID获取 Discord 频道失败:{err}")
channel = None
self._broadcast_channel = channel
if channel:
return channel
# 按 Guild 寻找一个可用文本频道
target_guilds = []
if self._guild_id:
guild = self._client.get_guild(self._guild_id)
if guild:
target_guilds.append(guild)
else:
target_guilds = list(self._client.guilds)
for guild in target_guilds:
for channel in guild.text_channels:
if guild.me and channel.permissions_for(guild.me).send_messages:
self._broadcast_channel = channel
return channel
return None
async def _get_dm_channel(self, userid: str) -> Optional[discord.DMChannel]:
if userid in self._user_dm_cache:
return self._user_dm_cache.get(userid)
try:
user_obj = self._client.get_user(int(userid)) or await self._client.fetch_user(int(userid))
if not user_obj:
return None
dm = user_obj.dm_channel or await user_obj.create_dm()
if dm:
self._user_dm_cache[userid] = dm
return dm
except Exception as err:
logger.error(f"获取 Discord 私聊失败:{err}")
return None
def _should_process_message(self, message: discord.Message) -> bool:
if isinstance(message.channel, discord.DMChannel):
return True
content = message.content or ""
# 仅处理 @Bot 或斜杠命令
if self._client.user and self._client.user.mentioned_in(message):
return True
if content.startswith("/"):
return True
return False
def _clean_bot_mention(self, content: str) -> str:
if not content:
return ""
if self._bot_user_id:
mention_pattern = rf"<@!?{self._bot_user_id}>"
content = re.sub(mention_pattern, "", content).strip()
return content
async def _post_to_ds(self, payload: Dict[str, Any]) -> None:
try:
proxy = None
if settings.PROXY:
proxy = settings.PROXY.get("https") or settings.PROXY.get("http")
async with httpx.AsyncClient(timeout=10, verify=False, proxy=proxy) as client:
await client.post(self._ds_url, json=payload)
except Exception as err:
logger.error(f"转发 Discord 消息失败:{err}")