mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-07-05 19:38:40 +08:00
Add Telegram parse mode support
This commit is contained in:
@@ -493,6 +493,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
text=message.text,
|
||||
userid=userid,
|
||||
original_chat_id=message.original_chat_id,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
elif message.voice_path:
|
||||
client.send_voice(
|
||||
@@ -500,6 +501,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
userid=userid,
|
||||
caption=message.voice_caption,
|
||||
original_chat_id=message.original_chat_id,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
else:
|
||||
client.send_msg(
|
||||
@@ -512,6 +514,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
original_message_id=message.original_message_id,
|
||||
original_chat_id=message.original_chat_id,
|
||||
disable_web_page_preview=message.disable_web_page_preview,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
|
||||
def post_medias_message(
|
||||
@@ -536,6 +539,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
buttons=message.buttons,
|
||||
original_message_id=message.original_message_id,
|
||||
original_chat_id=message.original_chat_id,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
|
||||
def post_torrents_message(
|
||||
@@ -560,6 +564,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
buttons=message.buttons,
|
||||
original_message_id=message.original_message_id,
|
||||
original_chat_id=message.original_chat_id,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
|
||||
def delete_message(
|
||||
@@ -600,6 +605,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
title: Optional[str] = None,
|
||||
buttons: Optional[List[List[dict]]] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
编辑消息
|
||||
@@ -611,6 +617,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
:param title: 消息标题
|
||||
:param buttons: 新的按钮列表
|
||||
:param metadata: 其他元信息
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
:return: 编辑是否成功
|
||||
"""
|
||||
if channel != self._channel:
|
||||
@@ -626,6 +633,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
text=text,
|
||||
title=title,
|
||||
buttons=buttons,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
if result:
|
||||
return True
|
||||
@@ -713,6 +721,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
userid=userid,
|
||||
caption=message.voice_caption,
|
||||
original_chat_id=message.original_chat_id,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
else:
|
||||
result = client.send_msg(
|
||||
@@ -722,6 +731,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
|
||||
userid=userid,
|
||||
link=message.link,
|
||||
disable_web_page_preview=message.disable_web_page_preview,
|
||||
parse_mode=message.parse_mode,
|
||||
)
|
||||
if result and result.get("success"):
|
||||
return MessageResponse(
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import asyncio
|
||||
import html as html_utils
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, List, Dict, Callable, Union
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
from urllib.parse import urljoin, quote
|
||||
|
||||
from telebot import TeleBot, apihelper
|
||||
@@ -28,6 +29,15 @@ from app.utils.http import RequestUtils
|
||||
from app.utils.string import StringUtils
|
||||
|
||||
|
||||
TELEGRAM_PARSE_MODE_MARKDOWN = "MarkdownV2"
|
||||
TELEGRAM_PARSE_MODE_HTML = "HTML"
|
||||
TELEGRAM_PARSE_MODE_ALIASES = {
|
||||
"markdownv2": TELEGRAM_PARSE_MODE_MARKDOWN,
|
||||
"mdv2": TELEGRAM_PARSE_MODE_MARKDOWN,
|
||||
"html": TELEGRAM_PARSE_MODE_HTML,
|
||||
}
|
||||
|
||||
|
||||
class RetryException(Exception):
|
||||
pass
|
||||
|
||||
@@ -84,7 +94,7 @@ class Telegram:
|
||||
# 设置代理
|
||||
apihelper.proxy = settings.PROXY
|
||||
# bot
|
||||
_bot = TeleBot(self._telegram_token, parse_mode="MarkdownV2")
|
||||
_bot = TeleBot(self._telegram_token, parse_mode=TELEGRAM_PARSE_MODE_MARKDOWN)
|
||||
# 记录句柄
|
||||
self._bot = _bot
|
||||
# 获取并存储bot用户名用于@检测
|
||||
@@ -254,6 +264,80 @@ class Telegram:
|
||||
return Telegram._telegramify_item_text(item)
|
||||
return entities_to_markdownv2(item.caption_text, item.caption_entities)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_parse_mode(parse_mode: Optional[str] = None) -> str:
|
||||
"""规范化 Telegram 消息格式类型。"""
|
||||
if not parse_mode:
|
||||
return TELEGRAM_PARSE_MODE_MARKDOWN
|
||||
return TELEGRAM_PARSE_MODE_ALIASES.get(
|
||||
str(parse_mode).strip().lower(), TELEGRAM_PARSE_MODE_MARKDOWN
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _is_html_parse_mode(parse_mode: Optional[str] = None) -> bool:
|
||||
"""判断本次发送是否使用 Telegram HTML 格式。"""
|
||||
return Telegram._normalize_parse_mode(parse_mode) == TELEGRAM_PARSE_MODE_HTML
|
||||
|
||||
@staticmethod
|
||||
def _format_title(title: Optional[str], parse_mode: Optional[str] = None) -> Optional[str]:
|
||||
"""按 parse_mode 生成 Telegram 标题文本。"""
|
||||
if not title:
|
||||
return None
|
||||
if Telegram._is_html_parse_mode(parse_mode):
|
||||
return f"<b>{html_utils.escape(title).removesuffix(chr(10))}</b>"
|
||||
return f"**{standardize(title).removesuffix(chr(10))}**"
|
||||
|
||||
@staticmethod
|
||||
def _format_link(label: str, link: str, parse_mode: Optional[str] = None) -> str:
|
||||
"""按 parse_mode 生成 Telegram 链接文本。"""
|
||||
if Telegram._is_html_parse_mode(parse_mode):
|
||||
return (
|
||||
f'<a href="{html_utils.escape(link, quote=True)}">'
|
||||
f"{html_utils.escape(label)}</a>"
|
||||
)
|
||||
return f"[{label}]({link})"
|
||||
|
||||
@staticmethod
|
||||
def _format_italic(text: str, parse_mode: Optional[str] = None) -> str:
|
||||
"""按 parse_mode 生成 Telegram 斜体文本。"""
|
||||
if Telegram._is_html_parse_mode(parse_mode):
|
||||
return f"<i>{html_utils.escape(text)}</i>"
|
||||
return f"_{text}_"
|
||||
|
||||
@staticmethod
|
||||
def _format_detail_link(link: str, parse_mode: Optional[str] = None) -> str:
|
||||
"""按 parse_mode 生成查看详情链接。"""
|
||||
return Telegram._format_link("查看详情", link, parse_mode)
|
||||
|
||||
@staticmethod
|
||||
def _prepare_text(text: Optional[str], parse_mode: Optional[str] = None) -> Optional[str]:
|
||||
"""按 parse_mode 生成 Telegram 可发送文本。"""
|
||||
if not text:
|
||||
return None
|
||||
if Telegram._is_html_parse_mode(parse_mode):
|
||||
return text
|
||||
return standardize(text)
|
||||
|
||||
@staticmethod
|
||||
def _split_plain_text(text: str, limit: int) -> List[str]:
|
||||
"""按 Telegram 长度限制拆分普通文本。"""
|
||||
if not text:
|
||||
return []
|
||||
if limit <= 0:
|
||||
return [text]
|
||||
chunks = []
|
||||
remaining = text
|
||||
while remaining:
|
||||
if len(remaining) <= limit:
|
||||
chunks.append(remaining)
|
||||
break
|
||||
split_at = remaining.rfind("\n", 0, limit)
|
||||
if split_at <= 0:
|
||||
split_at = limit
|
||||
chunks.append(remaining[:split_at])
|
||||
remaining = remaining[split_at:].lstrip("\n")
|
||||
return chunks
|
||||
|
||||
@staticmethod
|
||||
def _serialize_update_payload(message: Any) -> Optional[dict]:
|
||||
"""
|
||||
@@ -481,6 +565,7 @@ class Telegram:
|
||||
original_chat_id: Optional[str] = None,
|
||||
disable_web_page_preview: Optional[bool] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
发送Telegram消息
|
||||
@@ -494,11 +579,13 @@ class Telegram:
|
||||
:param original_chat_id: 原消息的聊天ID,编辑消息时需要
|
||||
:param disable_web_page_preview: 是否禁用链接预览
|
||||
:param stop_typing: 发送完成后是否立即停止 typing
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
:return: 包含 message_id, chat_id, success 的字典
|
||||
"""
|
||||
if not self._telegram_token or not self._telegram_chat_id:
|
||||
return None
|
||||
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
# Determine target chat_id with improved logic using user mapping
|
||||
chat_id = self._determine_target_chat_id(userid, original_chat_id)
|
||||
if not title and not text:
|
||||
@@ -507,10 +594,7 @@ class Telegram:
|
||||
return {"success": False}
|
||||
|
||||
try:
|
||||
# 标准化标题后再加粗,避免**符号被显示为文本
|
||||
bold_title = (
|
||||
f"**{standardize(title).removesuffix('\n')}**" if title else None
|
||||
)
|
||||
bold_title = self._format_title(title, parse_mode)
|
||||
if bold_title and text:
|
||||
caption = f"{bold_title}\n{text}"
|
||||
elif bold_title:
|
||||
@@ -521,7 +605,7 @@ class Telegram:
|
||||
caption = ""
|
||||
|
||||
if link:
|
||||
caption = f"{caption}\n[查看详情]({link})"
|
||||
caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}"
|
||||
|
||||
# 创建按钮键盘
|
||||
reply_markup = None
|
||||
@@ -538,6 +622,7 @@ class Telegram:
|
||||
buttons,
|
||||
image,
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
self._stop_typing_if_needed(chat_id, stop_typing)
|
||||
return {
|
||||
@@ -553,6 +638,7 @@ class Telegram:
|
||||
caption=caption,
|
||||
reply_markup=reply_markup,
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
self._stop_typing_if_needed(chat_id, stop_typing)
|
||||
if sent and hasattr(sent, "message_id"):
|
||||
@@ -577,6 +663,7 @@ class Telegram:
|
||||
caption: Optional[str] = None,
|
||||
original_chat_id: Optional[str] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
发送Telegram语音消息。
|
||||
@@ -585,6 +672,7 @@ class Telegram:
|
||||
return None
|
||||
|
||||
chat_id = self._determine_target_chat_id(userid, original_chat_id)
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
voice_file = Path(voice_path)
|
||||
if not voice_file.exists():
|
||||
logger.error(f"语音文件不存在: {voice_file}")
|
||||
@@ -596,8 +684,8 @@ class Telegram:
|
||||
sent = self._bot.send_voice(
|
||||
chat_id=chat_id,
|
||||
voice=fp,
|
||||
caption=standardize(caption) if caption else None,
|
||||
parse_mode="MarkdownV2" if caption else None,
|
||||
caption=self._prepare_text(caption, parse_mode),
|
||||
parse_mode=parse_mode if caption else None,
|
||||
)
|
||||
self._stop_typing_if_needed(chat_id, stop_typing)
|
||||
if sent and hasattr(sent, "message_id"):
|
||||
@@ -626,6 +714,7 @@ class Telegram:
|
||||
file_name: Optional[str] = None,
|
||||
original_chat_id: Optional[str] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
发送本地图片或文件给 Telegram 用户。
|
||||
@@ -634,6 +723,7 @@ class Telegram:
|
||||
return None
|
||||
|
||||
chat_id = self._determine_target_chat_id(userid, original_chat_id)
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
local_file = Path(file_path)
|
||||
if not local_file.exists() or not local_file.is_file():
|
||||
logger.error(f"附件文件不存在: {local_file}")
|
||||
@@ -645,9 +735,7 @@ class Telegram:
|
||||
is_image = suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}
|
||||
|
||||
try:
|
||||
bold_title = (
|
||||
f"**{standardize(title).removesuffix('\n')}**" if title else None
|
||||
)
|
||||
bold_title = self._format_title(title, parse_mode)
|
||||
if bold_title and text:
|
||||
caption = f"{bold_title}\n{text}"
|
||||
elif bold_title:
|
||||
@@ -660,15 +748,15 @@ class Telegram:
|
||||
sent = self._bot.send_photo(
|
||||
chat_id=chat_id,
|
||||
photo=fp,
|
||||
caption=standardize(caption) if caption else None,
|
||||
parse_mode="MarkdownV2" if caption else None,
|
||||
caption=self._prepare_text(caption, parse_mode),
|
||||
parse_mode=parse_mode if caption else None,
|
||||
)
|
||||
else:
|
||||
sent = self._bot.send_document(
|
||||
chat_id=chat_id,
|
||||
document=(send_name, fp),
|
||||
caption=standardize(caption) if caption else None,
|
||||
parse_mode="MarkdownV2" if caption else None,
|
||||
caption=self._prepare_text(caption, parse_mode),
|
||||
parse_mode=parse_mode if caption else None,
|
||||
)
|
||||
self._stop_typing_if_needed(chat_id, stop_typing)
|
||||
if sent and hasattr(sent, "message_id"):
|
||||
@@ -717,6 +805,7 @@ class Telegram:
|
||||
original_message_id: Optional[int] = None,
|
||||
original_chat_id: Optional[str] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
发送媒体列表消息
|
||||
@@ -728,38 +817,39 @@ class Telegram:
|
||||
:param original_message_id: 原消息ID,如果提供则编辑原消息
|
||||
:param original_chat_id: 原消息的聊天ID,编辑消息时需要
|
||||
:param stop_typing: 发送完成后是否立即停止 typing
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
"""
|
||||
if not self._telegram_token or not self._telegram_chat_id:
|
||||
return None
|
||||
|
||||
# 列表消息也可能是一次交互的最终响应,默认在发送后结束 typing。
|
||||
chat_id = self._determine_target_chat_id(userid, original_chat_id)
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
try:
|
||||
index, image, caption = 1, "", "*%s*" % title
|
||||
index, image = 1, ""
|
||||
caption = self._format_title(title, parse_mode) or ""
|
||||
for media in medias:
|
||||
if not image:
|
||||
image = media.get_message_image()
|
||||
media_link = self._format_link(
|
||||
media.title_year, media.detail_link, parse_mode
|
||||
)
|
||||
type_text = f"类型:{media.type.value}"
|
||||
if media.vote_average:
|
||||
caption = "%s\n%s. [%s](%s)\n_%s,%s_" % (
|
||||
caption,
|
||||
index,
|
||||
media.title_year,
|
||||
media.detail_link,
|
||||
f"类型:{media.type.value}",
|
||||
f"评分:{media.vote_average}",
|
||||
score_text = f"评分:{media.vote_average}"
|
||||
caption = (
|
||||
f"{caption}\n{index}. {media_link}\n"
|
||||
f"{self._format_italic(f'{type_text},{score_text}', parse_mode)}"
|
||||
)
|
||||
else:
|
||||
caption = "%s\n%s. [%s](%s)\n_%s_" % (
|
||||
caption,
|
||||
index,
|
||||
media.title_year,
|
||||
media.detail_link,
|
||||
f"类型:{media.type.value}",
|
||||
caption = (
|
||||
f"{caption}\n{index}. {media_link}\n"
|
||||
f"{self._format_italic(type_text, parse_mode)}"
|
||||
)
|
||||
index += 1
|
||||
|
||||
if link:
|
||||
caption = f"{caption}\n[查看详情]({link})"
|
||||
caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}"
|
||||
|
||||
# 创建按钮键盘
|
||||
reply_markup = None
|
||||
@@ -770,7 +860,12 @@ class Telegram:
|
||||
if original_message_id and original_chat_id:
|
||||
# 编辑消息
|
||||
return self.__edit_message(
|
||||
original_chat_id, original_message_id, caption, buttons, image
|
||||
original_chat_id,
|
||||
original_message_id,
|
||||
caption,
|
||||
buttons,
|
||||
image,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
else:
|
||||
# 发送新消息
|
||||
@@ -779,6 +874,7 @@ class Telegram:
|
||||
image=image,
|
||||
caption=caption,
|
||||
reply_markup=reply_markup,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
|
||||
except Exception as msg_e:
|
||||
@@ -797,6 +893,7 @@ class Telegram:
|
||||
original_message_id: Optional[int] = None,
|
||||
original_chat_id: Optional[str] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
发送种子列表消息
|
||||
@@ -808,14 +905,17 @@ class Telegram:
|
||||
:param original_message_id: 原消息ID,如果提供则编辑原消息
|
||||
:param original_chat_id: 原消息的聊天ID,编辑消息时需要
|
||||
:param stop_typing: 发送完成后是否立即停止 typing
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
"""
|
||||
if not self._telegram_token or not self._telegram_chat_id:
|
||||
return None
|
||||
|
||||
# 资源列表是搜索交互的常见出口,默认在发送后结束 typing。
|
||||
chat_id = self._determine_target_chat_id(userid, original_chat_id)
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
try:
|
||||
index, caption = 1, "*%s*" % title
|
||||
index = 1
|
||||
caption = self._format_title(title, parse_mode) or ""
|
||||
image = torrents[0].media_info.get_message_image()
|
||||
for context in torrents:
|
||||
torrent = context.torrent_info
|
||||
@@ -831,14 +931,20 @@ class Telegram:
|
||||
title = re.sub(r"\s+", " ", title).strip()
|
||||
free = torrent.volume_factor
|
||||
seeder = f"{torrent.seeders}↑"
|
||||
site_name = (
|
||||
html_utils.escape(site_name)
|
||||
if self._is_html_parse_mode(parse_mode)
|
||||
else site_name
|
||||
)
|
||||
title_link = self._format_link(title, link, parse_mode)
|
||||
caption = (
|
||||
f"{caption}\n{index}.【{site_name}】[{title}]({link}) "
|
||||
f"{caption}\n{index}.【{site_name}】{title_link} "
|
||||
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}"
|
||||
)
|
||||
index += 1
|
||||
|
||||
if link:
|
||||
caption = f"{caption}\n[查看详情]({link})"
|
||||
caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}"
|
||||
|
||||
# 创建按钮键盘
|
||||
reply_markup = None
|
||||
@@ -849,7 +955,12 @@ class Telegram:
|
||||
if original_message_id and original_chat_id:
|
||||
# 编辑消息(种子消息通常没有图片)
|
||||
return self.__edit_message(
|
||||
original_chat_id, original_message_id, caption, buttons, image
|
||||
original_chat_id,
|
||||
original_message_id,
|
||||
caption,
|
||||
buttons,
|
||||
image,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
else:
|
||||
# 发送新消息
|
||||
@@ -858,6 +969,7 @@ class Telegram:
|
||||
image=image,
|
||||
caption=caption,
|
||||
reply_markup=reply_markup,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
|
||||
except Exception as msg_e:
|
||||
@@ -955,6 +1067,7 @@ class Telegram:
|
||||
title: Optional[str] = None,
|
||||
buttons: Optional[List[List[dict]]] = None,
|
||||
stop_typing: bool = False,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
编辑Telegram消息(公开方法)
|
||||
@@ -964,15 +1077,17 @@ class Telegram:
|
||||
:param title: 消息标题
|
||||
:param buttons: 新的按钮列表
|
||||
:param stop_typing: 编辑完成后是否立即停止 typing
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
:return: 编辑是否成功
|
||||
"""
|
||||
if not self._bot:
|
||||
return None
|
||||
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
try:
|
||||
# 组合标题和文本
|
||||
if title:
|
||||
bold_title = f"**{standardize(title).removesuffix(chr(10))}**"
|
||||
bold_title = self._format_title(title, parse_mode)
|
||||
caption = f"{bold_title}\n{text}" if text else bold_title
|
||||
elif text:
|
||||
caption = text
|
||||
@@ -984,6 +1099,7 @@ class Telegram:
|
||||
message_id=int(message_id),
|
||||
text=caption,
|
||||
buttons=buttons,
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"编辑Telegram消息异常: {str(e)}")
|
||||
@@ -1006,6 +1122,7 @@ class Telegram:
|
||||
buttons: Optional[List[List[dict]]] = None,
|
||||
image: Optional[str] = None,
|
||||
disable_web_page_preview: Optional[bool] = None,
|
||||
parse_mode: Optional[str] = None,
|
||||
) -> Optional[bool]:
|
||||
"""
|
||||
编辑已发送的消息
|
||||
@@ -1015,11 +1132,13 @@ class Telegram:
|
||||
:param buttons: 按钮列表
|
||||
:param image: 图片URL或路径
|
||||
:param disable_web_page_preview: 是否禁用链接预览(仅纯文本编辑时生效)
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
:return: 编辑是否成功
|
||||
"""
|
||||
if not self._bot:
|
||||
return None
|
||||
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
try:
|
||||
# 创建按钮键盘
|
||||
reply_markup = None
|
||||
@@ -1029,7 +1148,9 @@ class Telegram:
|
||||
if image:
|
||||
# 如果有图片,使用edit_message_media
|
||||
media = InputMediaPhoto(
|
||||
media=image, caption=standardize(text), parse_mode="MarkdownV2"
|
||||
media=image,
|
||||
caption=self._prepare_text(text, parse_mode),
|
||||
parse_mode=parse_mode,
|
||||
)
|
||||
self._bot.edit_message_media(
|
||||
chat_id=chat_id,
|
||||
@@ -1042,8 +1163,8 @@ class Telegram:
|
||||
edit_text_kwargs: Dict[str, Any] = {
|
||||
"chat_id": chat_id,
|
||||
"message_id": message_id,
|
||||
"text": standardize(text),
|
||||
"parse_mode": "MarkdownV2",
|
||||
"text": self._prepare_text(text, parse_mode),
|
||||
"parse_mode": parse_mode,
|
||||
"reply_markup": reply_markup,
|
||||
}
|
||||
if disable_web_page_preview is not None:
|
||||
@@ -1058,8 +1179,8 @@ class Telegram:
|
||||
self._bot.edit_message_caption(
|
||||
chat_id=chat_id,
|
||||
message_id=message_id,
|
||||
caption=standardize(text),
|
||||
parse_mode="MarkdownV2",
|
||||
caption=self._prepare_text(text, parse_mode),
|
||||
parse_mode=parse_mode,
|
||||
reply_markup=reply_markup,
|
||||
)
|
||||
return True
|
||||
@@ -1074,16 +1195,19 @@ class Telegram:
|
||||
caption="",
|
||||
reply_markup: Optional[InlineKeyboardMarkup] = None,
|
||||
disable_web_page_preview: Optional[bool] = None,
|
||||
parse_mode: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
向Telegram发送报文,返回发送的消息对象
|
||||
:param reply_markup: 内联键盘
|
||||
:param disable_web_page_preview: 是否禁用链接预览
|
||||
:param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
:return: 发送成功返回消息对象,失败返回None
|
||||
"""
|
||||
parse_mode = self._normalize_parse_mode(parse_mode)
|
||||
kwargs = {
|
||||
"chat_id": userid or self._telegram_chat_id,
|
||||
"parse_mode": "MarkdownV2",
|
||||
"parse_mode": parse_mode,
|
||||
"reply_markup": reply_markup,
|
||||
}
|
||||
# 处理图片
|
||||
@@ -1096,6 +1220,14 @@ class Telegram:
|
||||
ret = self.__send_short_message(image, caption,
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
**kwargs)
|
||||
elif self._is_html_parse_mode(parse_mode):
|
||||
ret = self.__send_long_plain_message(
|
||||
image,
|
||||
caption,
|
||||
caption_limit,
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
sent_idx = set()
|
||||
ret = self.__send_long_message(image, caption, sent_idx,
|
||||
@@ -1125,20 +1257,61 @@ class Telegram:
|
||||
"""
|
||||
发送短消息
|
||||
"""
|
||||
parse_mode = kwargs.get("parse_mode")
|
||||
try:
|
||||
if image:
|
||||
return self._bot.send_photo(
|
||||
photo=image, caption=standardize(caption), **kwargs
|
||||
photo=image,
|
||||
caption=self._prepare_text(caption, parse_mode),
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
return self._bot.send_message(
|
||||
text=standardize(caption),
|
||||
text=self._prepare_text(caption, parse_mode),
|
||||
disable_web_page_preview=disable_web_page_preview,
|
||||
**kwargs
|
||||
)
|
||||
except Exception:
|
||||
raise RetryException(f"发送{'图片' if image else '文本'}消息失败")
|
||||
|
||||
@retry(RetryException, logger=logger)
|
||||
def __send_long_plain_message(
|
||||
self,
|
||||
image: Optional[bytes],
|
||||
caption: str,
|
||||
caption_limit: int,
|
||||
disable_web_page_preview: Optional[bool] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
按 Telegram 长度限制发送长文本。
|
||||
"""
|
||||
reply_markup = kwargs.pop("reply_markup", None)
|
||||
chunks = self._split_plain_text(caption, caption_limit)
|
||||
ret = None
|
||||
try:
|
||||
for index, chunk in enumerate(chunks):
|
||||
current_reply_markup = reply_markup if index == 0 else None
|
||||
if image and index == 0:
|
||||
ret = self._bot.send_photo(
|
||||
**kwargs,
|
||||
photo=image,
|
||||
caption=chunk,
|
||||
reply_markup=current_reply_markup,
|
||||
)
|
||||
continue
|
||||
msg_kwargs = dict(**kwargs)
|
||||
if disable_web_page_preview is not None:
|
||||
msg_kwargs["disable_web_page_preview"] = disable_web_page_preview
|
||||
ret = self._bot.send_message(
|
||||
**msg_kwargs,
|
||||
text=chunk,
|
||||
reply_markup=current_reply_markup,
|
||||
)
|
||||
return ret
|
||||
except Exception as err:
|
||||
raise RetryException("长消息发送失败") from err
|
||||
|
||||
@retry(RetryException, logger=logger)
|
||||
def __send_long_message(
|
||||
self, image: Optional[bytes], caption: str, sent_idx: set,
|
||||
|
||||
@@ -223,6 +223,8 @@ class Notification(BaseModel):
|
||||
original_chat_id: Optional[str] = None
|
||||
# 是否禁用链接预览(仅Telegram支持)
|
||||
disable_web_page_preview: Optional[bool] = None
|
||||
# Telegram 消息格式类型,默认 MarkdownV2,可传 HTML
|
||||
parse_mode: Optional[str] = None
|
||||
# 是否写入消息历史
|
||||
save_history: bool = True
|
||||
|
||||
|
||||
@@ -2,12 +2,17 @@
|
||||
"""
|
||||
Telegram 模块单元测试(pytest 原生)。
|
||||
"""
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from app.core.context import MediaInfo, Context, TorrentInfo
|
||||
from app.core.metainfo import MetaInfo
|
||||
from app.modules.telegram import TelegramModule
|
||||
from app.modules.telegram.telegram import Telegram
|
||||
from app.schemas import Notification
|
||||
from app.schemas.types import MessageChannel
|
||||
from app.schemas.types import MediaType
|
||||
|
||||
|
||||
@@ -243,6 +248,56 @@ def test_send_msg_markdown_escaping(telegram):
|
||||
|
||||
# 验证返回值:send_msg 失败时返回 {"success": False}(非空字典),故显式断言 success
|
||||
assert result and result.get("success")
|
||||
send_kwargs = telegram.bot.send_message.call_args.kwargs
|
||||
assert send_kwargs["parse_mode"] == "MarkdownV2"
|
||||
assert send_kwargs["text"].startswith("*测试标题*\n")
|
||||
|
||||
|
||||
def test_send_msg_with_html_parse_mode_keeps_html(telegram):
|
||||
"""HTML模式发送时应保留调用方传入的HTML内容"""
|
||||
result = telegram.send_msg(
|
||||
title="测试 <标题>",
|
||||
text="<blockquote>第一行</blockquote><b>加粗</b>",
|
||||
link="https://example.com/?a=1&b=2",
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
assert result and result.get("success")
|
||||
send_kwargs = telegram.bot.send_message.call_args.kwargs
|
||||
assert send_kwargs["parse_mode"] == "HTML"
|
||||
assert send_kwargs["text"] == (
|
||||
'<b>测试 <标题></b>\n'
|
||||
'<blockquote>第一行</blockquote><b>加粗</b>\n'
|
||||
'<a href="https://example.com/?a=1&b=2">查看详情</a>'
|
||||
)
|
||||
|
||||
|
||||
def test_telegram_module_passes_parse_mode_to_client():
|
||||
"""模块发送通知时应透传消息指定的parse_mode"""
|
||||
module = TelegramModule()
|
||||
client = Mock()
|
||||
|
||||
with patch.object(
|
||||
module,
|
||||
"get_configs",
|
||||
return_value={"telegram-test": SimpleNamespace(name="telegram-test")},
|
||||
), patch.object(
|
||||
module, "check_message", return_value=True
|
||||
), patch.object(
|
||||
module, "get_instance", return_value=client
|
||||
):
|
||||
module.post_message(
|
||||
Notification(
|
||||
channel=MessageChannel.Telegram,
|
||||
source="telegram-test",
|
||||
title="HTML",
|
||||
text="<b>正文</b>",
|
||||
parse_mode="HTML",
|
||||
)
|
||||
)
|
||||
|
||||
client.send_msg.assert_called_once()
|
||||
assert client.send_msg.call_args.kwargs["parse_mode"] == "HTML"
|
||||
|
||||
|
||||
def test_edit_msg_falls_back_to_caption_when_original_message_has_no_text(telegram):
|
||||
@@ -274,6 +329,22 @@ def test_edit_msg_falls_back_to_caption_when_original_message_has_no_text(telegr
|
||||
assert caption_kwargs["reply_markup"] is not None
|
||||
|
||||
|
||||
def test_edit_msg_with_html_parse_mode_keeps_html(telegram):
|
||||
"""HTML模式编辑消息时应保留HTML内容"""
|
||||
result = telegram.edit_msg(
|
||||
chat_id="1051253579",
|
||||
message_id="110502",
|
||||
title="标题",
|
||||
text="<blockquote>请选择</blockquote>",
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
assert result is True
|
||||
edit_kwargs = telegram.bot.edit_message_text.call_args.kwargs
|
||||
assert edit_kwargs["parse_mode"] == "HTML"
|
||||
assert edit_kwargs["text"] == "<b>标题</b>\n<blockquote>请选择</blockquote>"
|
||||
|
||||
|
||||
def test_edit_msg_keeps_other_edit_errors_failed(telegram):
|
||||
"""非图片 caption 场景的编辑错误不应被错误标记为成功。"""
|
||||
telegram.bot.edit_message_text.side_effect = Exception(
|
||||
|
||||
Reference in New Issue
Block a user