From 683e07a102a314ab4c40d90b8114bfdfc521acc3 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 21 Jun 2026 10:20:30 +0800 Subject: [PATCH] Add Telegram parse mode support --- app/modules/telegram/__init__.py | 10 ++ app/modules/telegram/telegram.py | 263 +++++++++++++++++++++++++------ app/schemas/message.py | 2 + tests/test_telegram.py | 73 ++++++++- 4 files changed, 302 insertions(+), 46 deletions(-) diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py index f051fd64..5c4d0b75 100644 --- a/app/modules/telegram/__init__.py +++ b/app/modules/telegram/__init__.py @@ -493,6 +493,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): text=message.text, userid=userid, original_chat_id=message.original_chat_id, + parse_mode=message.parse_mode, ) elif message.voice_path: client.send_voice( @@ -500,6 +501,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): userid=userid, caption=message.voice_caption, original_chat_id=message.original_chat_id, + parse_mode=message.parse_mode, ) else: client.send_msg( @@ -512,6 +514,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): original_message_id=message.original_message_id, original_chat_id=message.original_chat_id, disable_web_page_preview=message.disable_web_page_preview, + parse_mode=message.parse_mode, ) def post_medias_message( @@ -536,6 +539,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): buttons=message.buttons, original_message_id=message.original_message_id, original_chat_id=message.original_chat_id, + parse_mode=message.parse_mode, ) def post_torrents_message( @@ -560,6 +564,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): buttons=message.buttons, original_message_id=message.original_message_id, original_chat_id=message.original_chat_id, + parse_mode=message.parse_mode, ) def delete_message( @@ -600,6 +605,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): title: Optional[str] = None, buttons: Optional[List[List[dict]]] = None, metadata: Optional[dict] = None, + parse_mode: Optional[str] = None, ) -> Optional[bool]: """ 编辑消息 @@ -611,6 +617,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): :param title: 消息标题 :param buttons: 新的按钮列表 :param metadata: 其他元信息 + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML :return: 编辑是否成功 """ if channel != self._channel: @@ -626,6 +633,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): text=text, title=title, buttons=buttons, + parse_mode=parse_mode, ) if result: return True @@ -713,6 +721,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): userid=userid, caption=message.voice_caption, original_chat_id=message.original_chat_id, + parse_mode=message.parse_mode, ) else: result = client.send_msg( @@ -722,6 +731,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): userid=userid, link=message.link, disable_web_page_preview=message.disable_web_page_preview, + parse_mode=message.parse_mode, ) if result and result.get("success"): return MessageResponse( diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 1703d128..a16db0aa 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -1,10 +1,11 @@ import asyncio +import html as html_utils import json import re import threading import time from pathlib import Path -from typing import Any, Optional, List, Dict, Callable, Union +from typing import Any, Callable, Dict, List, Optional, Union from urllib.parse import urljoin, quote from telebot import TeleBot, apihelper @@ -28,6 +29,15 @@ from app.utils.http import RequestUtils from app.utils.string import StringUtils +TELEGRAM_PARSE_MODE_MARKDOWN = "MarkdownV2" +TELEGRAM_PARSE_MODE_HTML = "HTML" +TELEGRAM_PARSE_MODE_ALIASES = { + "markdownv2": TELEGRAM_PARSE_MODE_MARKDOWN, + "mdv2": TELEGRAM_PARSE_MODE_MARKDOWN, + "html": TELEGRAM_PARSE_MODE_HTML, +} + + class RetryException(Exception): pass @@ -84,7 +94,7 @@ class Telegram: # 设置代理 apihelper.proxy = settings.PROXY # bot - _bot = TeleBot(self._telegram_token, parse_mode="MarkdownV2") + _bot = TeleBot(self._telegram_token, parse_mode=TELEGRAM_PARSE_MODE_MARKDOWN) # 记录句柄 self._bot = _bot # 获取并存储bot用户名用于@检测 @@ -254,6 +264,80 @@ class Telegram: return Telegram._telegramify_item_text(item) return entities_to_markdownv2(item.caption_text, item.caption_entities) + @staticmethod + def _normalize_parse_mode(parse_mode: Optional[str] = None) -> str: + """规范化 Telegram 消息格式类型。""" + if not parse_mode: + return TELEGRAM_PARSE_MODE_MARKDOWN + return TELEGRAM_PARSE_MODE_ALIASES.get( + str(parse_mode).strip().lower(), TELEGRAM_PARSE_MODE_MARKDOWN + ) + + @staticmethod + def _is_html_parse_mode(parse_mode: Optional[str] = None) -> bool: + """判断本次发送是否使用 Telegram HTML 格式。""" + return Telegram._normalize_parse_mode(parse_mode) == TELEGRAM_PARSE_MODE_HTML + + @staticmethod + def _format_title(title: Optional[str], parse_mode: Optional[str] = None) -> Optional[str]: + """按 parse_mode 生成 Telegram 标题文本。""" + if not title: + return None + if Telegram._is_html_parse_mode(parse_mode): + return f"{html_utils.escape(title).removesuffix(chr(10))}" + return f"**{standardize(title).removesuffix(chr(10))}**" + + @staticmethod + def _format_link(label: str, link: str, parse_mode: Optional[str] = None) -> str: + """按 parse_mode 生成 Telegram 链接文本。""" + if Telegram._is_html_parse_mode(parse_mode): + return ( + f'' + f"{html_utils.escape(label)}" + ) + return f"[{label}]({link})" + + @staticmethod + def _format_italic(text: str, parse_mode: Optional[str] = None) -> str: + """按 parse_mode 生成 Telegram 斜体文本。""" + if Telegram._is_html_parse_mode(parse_mode): + return f"{html_utils.escape(text)}" + return f"_{text}_" + + @staticmethod + def _format_detail_link(link: str, parse_mode: Optional[str] = None) -> str: + """按 parse_mode 生成查看详情链接。""" + return Telegram._format_link("查看详情", link, parse_mode) + + @staticmethod + def _prepare_text(text: Optional[str], parse_mode: Optional[str] = None) -> Optional[str]: + """按 parse_mode 生成 Telegram 可发送文本。""" + if not text: + return None + if Telegram._is_html_parse_mode(parse_mode): + return text + return standardize(text) + + @staticmethod + def _split_plain_text(text: str, limit: int) -> List[str]: + """按 Telegram 长度限制拆分普通文本。""" + if not text: + return [] + if limit <= 0: + return [text] + chunks = [] + remaining = text + while remaining: + if len(remaining) <= limit: + chunks.append(remaining) + break + split_at = remaining.rfind("\n", 0, limit) + if split_at <= 0: + split_at = limit + chunks.append(remaining[:split_at]) + remaining = remaining[split_at:].lstrip("\n") + return chunks + @staticmethod def _serialize_update_payload(message: Any) -> Optional[dict]: """ @@ -481,6 +565,7 @@ class Telegram: original_chat_id: Optional[str] = None, disable_web_page_preview: Optional[bool] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[dict]: """ 发送Telegram消息 @@ -494,11 +579,13 @@ class Telegram: :param original_chat_id: 原消息的聊天ID,编辑消息时需要 :param disable_web_page_preview: 是否禁用链接预览 :param stop_typing: 发送完成后是否立即停止 typing + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML :return: 包含 message_id, chat_id, success 的字典 """ if not self._telegram_token or not self._telegram_chat_id: return None + parse_mode = self._normalize_parse_mode(parse_mode) # Determine target chat_id with improved logic using user mapping chat_id = self._determine_target_chat_id(userid, original_chat_id) if not title and not text: @@ -507,10 +594,7 @@ class Telegram: return {"success": False} try: - # 标准化标题后再加粗,避免**符号被显示为文本 - bold_title = ( - f"**{standardize(title).removesuffix('\n')}**" if title else None - ) + bold_title = self._format_title(title, parse_mode) if bold_title and text: caption = f"{bold_title}\n{text}" elif bold_title: @@ -521,7 +605,7 @@ class Telegram: caption = "" if link: - caption = f"{caption}\n[查看详情]({link})" + caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}" # 创建按钮键盘 reply_markup = None @@ -538,6 +622,7 @@ class Telegram: buttons, image, disable_web_page_preview=disable_web_page_preview, + parse_mode=parse_mode, ) self._stop_typing_if_needed(chat_id, stop_typing) return { @@ -553,6 +638,7 @@ class Telegram: caption=caption, reply_markup=reply_markup, disable_web_page_preview=disable_web_page_preview, + parse_mode=parse_mode, ) self._stop_typing_if_needed(chat_id, stop_typing) if sent and hasattr(sent, "message_id"): @@ -577,6 +663,7 @@ class Telegram: caption: Optional[str] = None, original_chat_id: Optional[str] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[dict]: """ 发送Telegram语音消息。 @@ -585,6 +672,7 @@ class Telegram: return None chat_id = self._determine_target_chat_id(userid, original_chat_id) + parse_mode = self._normalize_parse_mode(parse_mode) voice_file = Path(voice_path) if not voice_file.exists(): logger.error(f"语音文件不存在: {voice_file}") @@ -596,8 +684,8 @@ class Telegram: sent = self._bot.send_voice( chat_id=chat_id, voice=fp, - caption=standardize(caption) if caption else None, - parse_mode="MarkdownV2" if caption else None, + caption=self._prepare_text(caption, parse_mode), + parse_mode=parse_mode if caption else None, ) self._stop_typing_if_needed(chat_id, stop_typing) if sent and hasattr(sent, "message_id"): @@ -626,6 +714,7 @@ class Telegram: file_name: Optional[str] = None, original_chat_id: Optional[str] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[dict]: """ 发送本地图片或文件给 Telegram 用户。 @@ -634,6 +723,7 @@ class Telegram: return None chat_id = self._determine_target_chat_id(userid, original_chat_id) + parse_mode = self._normalize_parse_mode(parse_mode) local_file = Path(file_path) if not local_file.exists() or not local_file.is_file(): logger.error(f"附件文件不存在: {local_file}") @@ -645,9 +735,7 @@ class Telegram: is_image = suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"} try: - bold_title = ( - f"**{standardize(title).removesuffix('\n')}**" if title else None - ) + bold_title = self._format_title(title, parse_mode) if bold_title and text: caption = f"{bold_title}\n{text}" elif bold_title: @@ -660,15 +748,15 @@ class Telegram: sent = self._bot.send_photo( chat_id=chat_id, photo=fp, - caption=standardize(caption) if caption else None, - parse_mode="MarkdownV2" if caption else None, + caption=self._prepare_text(caption, parse_mode), + parse_mode=parse_mode if caption else None, ) else: sent = self._bot.send_document( chat_id=chat_id, document=(send_name, fp), - caption=standardize(caption) if caption else None, - parse_mode="MarkdownV2" if caption else None, + caption=self._prepare_text(caption, parse_mode), + parse_mode=parse_mode if caption else None, ) self._stop_typing_if_needed(chat_id, stop_typing) if sent and hasattr(sent, "message_id"): @@ -717,6 +805,7 @@ class Telegram: original_message_id: Optional[int] = None, original_chat_id: Optional[str] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[bool]: """ 发送媒体列表消息 @@ -728,38 +817,39 @@ class Telegram: :param original_message_id: 原消息ID,如果提供则编辑原消息 :param original_chat_id: 原消息的聊天ID,编辑消息时需要 :param stop_typing: 发送完成后是否立即停止 typing + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML """ if not self._telegram_token or not self._telegram_chat_id: return None # 列表消息也可能是一次交互的最终响应,默认在发送后结束 typing。 chat_id = self._determine_target_chat_id(userid, original_chat_id) + parse_mode = self._normalize_parse_mode(parse_mode) try: - index, image, caption = 1, "", "*%s*" % title + index, image = 1, "" + caption = self._format_title(title, parse_mode) or "" for media in medias: if not image: image = media.get_message_image() + media_link = self._format_link( + media.title_year, media.detail_link, parse_mode + ) + type_text = f"类型:{media.type.value}" if media.vote_average: - caption = "%s\n%s. [%s](%s)\n_%s,%s_" % ( - caption, - index, - media.title_year, - media.detail_link, - f"类型:{media.type.value}", - f"评分:{media.vote_average}", + score_text = f"评分:{media.vote_average}" + caption = ( + f"{caption}\n{index}. {media_link}\n" + f"{self._format_italic(f'{type_text},{score_text}', parse_mode)}" ) else: - caption = "%s\n%s. [%s](%s)\n_%s_" % ( - caption, - index, - media.title_year, - media.detail_link, - f"类型:{media.type.value}", + caption = ( + f"{caption}\n{index}. {media_link}\n" + f"{self._format_italic(type_text, parse_mode)}" ) index += 1 if link: - caption = f"{caption}\n[查看详情]({link})" + caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}" # 创建按钮键盘 reply_markup = None @@ -770,7 +860,12 @@ class Telegram: if original_message_id and original_chat_id: # 编辑消息 return self.__edit_message( - original_chat_id, original_message_id, caption, buttons, image + original_chat_id, + original_message_id, + caption, + buttons, + image, + parse_mode=parse_mode, ) else: # 发送新消息 @@ -779,6 +874,7 @@ class Telegram: image=image, caption=caption, reply_markup=reply_markup, + parse_mode=parse_mode, ) except Exception as msg_e: @@ -797,6 +893,7 @@ class Telegram: original_message_id: Optional[int] = None, original_chat_id: Optional[str] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[bool]: """ 发送种子列表消息 @@ -808,14 +905,17 @@ class Telegram: :param original_message_id: 原消息ID,如果提供则编辑原消息 :param original_chat_id: 原消息的聊天ID,编辑消息时需要 :param stop_typing: 发送完成后是否立即停止 typing + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML """ if not self._telegram_token or not self._telegram_chat_id: return None # 资源列表是搜索交互的常见出口,默认在发送后结束 typing。 chat_id = self._determine_target_chat_id(userid, original_chat_id) + parse_mode = self._normalize_parse_mode(parse_mode) try: - index, caption = 1, "*%s*" % title + index = 1 + caption = self._format_title(title, parse_mode) or "" image = torrents[0].media_info.get_message_image() for context in torrents: torrent = context.torrent_info @@ -831,14 +931,20 @@ class Telegram: title = re.sub(r"\s+", " ", title).strip() free = torrent.volume_factor seeder = f"{torrent.seeders}↑" + site_name = ( + html_utils.escape(site_name) + if self._is_html_parse_mode(parse_mode) + else site_name + ) + title_link = self._format_link(title, link, parse_mode) caption = ( - f"{caption}\n{index}.【{site_name}】[{title}]({link}) " + f"{caption}\n{index}.【{site_name}】{title_link} " f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}" ) index += 1 if link: - caption = f"{caption}\n[查看详情]({link})" + caption = f"{caption}\n{self._format_detail_link(link, parse_mode)}" # 创建按钮键盘 reply_markup = None @@ -849,7 +955,12 @@ class Telegram: if original_message_id and original_chat_id: # 编辑消息(种子消息通常没有图片) return self.__edit_message( - original_chat_id, original_message_id, caption, buttons, image + original_chat_id, + original_message_id, + caption, + buttons, + image, + parse_mode=parse_mode, ) else: # 发送新消息 @@ -858,6 +969,7 @@ class Telegram: image=image, caption=caption, reply_markup=reply_markup, + parse_mode=parse_mode, ) except Exception as msg_e: @@ -955,6 +1067,7 @@ class Telegram: title: Optional[str] = None, buttons: Optional[List[List[dict]]] = None, stop_typing: bool = False, + parse_mode: Optional[str] = None, ) -> Optional[bool]: """ 编辑Telegram消息(公开方法) @@ -964,15 +1077,17 @@ class Telegram: :param title: 消息标题 :param buttons: 新的按钮列表 :param stop_typing: 编辑完成后是否立即停止 typing + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML :return: 编辑是否成功 """ if not self._bot: return None + parse_mode = self._normalize_parse_mode(parse_mode) try: # 组合标题和文本 if title: - bold_title = f"**{standardize(title).removesuffix(chr(10))}**" + bold_title = self._format_title(title, parse_mode) caption = f"{bold_title}\n{text}" if text else bold_title elif text: caption = text @@ -984,6 +1099,7 @@ class Telegram: message_id=int(message_id), text=caption, buttons=buttons, + parse_mode=parse_mode, ) except Exception as e: logger.error(f"编辑Telegram消息异常: {str(e)}") @@ -1006,6 +1122,7 @@ class Telegram: buttons: Optional[List[List[dict]]] = None, image: Optional[str] = None, disable_web_page_preview: Optional[bool] = None, + parse_mode: Optional[str] = None, ) -> Optional[bool]: """ 编辑已发送的消息 @@ -1015,11 +1132,13 @@ class Telegram: :param buttons: 按钮列表 :param image: 图片URL或路径 :param disable_web_page_preview: 是否禁用链接预览(仅纯文本编辑时生效) + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML :return: 编辑是否成功 """ if not self._bot: return None + parse_mode = self._normalize_parse_mode(parse_mode) try: # 创建按钮键盘 reply_markup = None @@ -1029,7 +1148,9 @@ class Telegram: if image: # 如果有图片,使用edit_message_media media = InputMediaPhoto( - media=image, caption=standardize(text), parse_mode="MarkdownV2" + media=image, + caption=self._prepare_text(text, parse_mode), + parse_mode=parse_mode, ) self._bot.edit_message_media( chat_id=chat_id, @@ -1042,8 +1163,8 @@ class Telegram: edit_text_kwargs: Dict[str, Any] = { "chat_id": chat_id, "message_id": message_id, - "text": standardize(text), - "parse_mode": "MarkdownV2", + "text": self._prepare_text(text, parse_mode), + "parse_mode": parse_mode, "reply_markup": reply_markup, } if disable_web_page_preview is not None: @@ -1058,8 +1179,8 @@ class Telegram: self._bot.edit_message_caption( chat_id=chat_id, message_id=message_id, - caption=standardize(text), - parse_mode="MarkdownV2", + caption=self._prepare_text(text, parse_mode), + parse_mode=parse_mode, reply_markup=reply_markup, ) return True @@ -1074,16 +1195,19 @@ class Telegram: caption="", reply_markup: Optional[InlineKeyboardMarkup] = None, disable_web_page_preview: Optional[bool] = None, + parse_mode: Optional[str] = None, ): """ 向Telegram发送报文,返回发送的消息对象 :param reply_markup: 内联键盘 :param disable_web_page_preview: 是否禁用链接预览 + :param parse_mode: Telegram 消息格式类型,默认 MarkdownV2,可传 HTML :return: 发送成功返回消息对象,失败返回None """ + parse_mode = self._normalize_parse_mode(parse_mode) kwargs = { "chat_id": userid or self._telegram_chat_id, - "parse_mode": "MarkdownV2", + "parse_mode": parse_mode, "reply_markup": reply_markup, } # 处理图片 @@ -1096,6 +1220,14 @@ class Telegram: ret = self.__send_short_message(image, caption, disable_web_page_preview=disable_web_page_preview, **kwargs) + elif self._is_html_parse_mode(parse_mode): + ret = self.__send_long_plain_message( + image, + caption, + caption_limit, + disable_web_page_preview=disable_web_page_preview, + **kwargs, + ) else: sent_idx = set() ret = self.__send_long_message(image, caption, sent_idx, @@ -1125,20 +1257,61 @@ class Telegram: """ 发送短消息 """ + parse_mode = kwargs.get("parse_mode") try: if image: return self._bot.send_photo( - photo=image, caption=standardize(caption), **kwargs + photo=image, + caption=self._prepare_text(caption, parse_mode), + **kwargs, ) else: return self._bot.send_message( - text=standardize(caption), + text=self._prepare_text(caption, parse_mode), disable_web_page_preview=disable_web_page_preview, **kwargs ) except Exception: raise RetryException(f"发送{'图片' if image else '文本'}消息失败") + @retry(RetryException, logger=logger) + def __send_long_plain_message( + self, + image: Optional[bytes], + caption: str, + caption_limit: int, + disable_web_page_preview: Optional[bool] = None, + **kwargs, + ): + """ + 按 Telegram 长度限制发送长文本。 + """ + reply_markup = kwargs.pop("reply_markup", None) + chunks = self._split_plain_text(caption, caption_limit) + ret = None + try: + for index, chunk in enumerate(chunks): + current_reply_markup = reply_markup if index == 0 else None + if image and index == 0: + ret = self._bot.send_photo( + **kwargs, + photo=image, + caption=chunk, + reply_markup=current_reply_markup, + ) + continue + msg_kwargs = dict(**kwargs) + if disable_web_page_preview is not None: + msg_kwargs["disable_web_page_preview"] = disable_web_page_preview + ret = self._bot.send_message( + **msg_kwargs, + text=chunk, + reply_markup=current_reply_markup, + ) + return ret + except Exception as err: + raise RetryException("长消息发送失败") from err + @retry(RetryException, logger=logger) def __send_long_message( self, image: Optional[bytes], caption: str, sent_idx: set, diff --git a/app/schemas/message.py b/app/schemas/message.py index d3624ad1..7fdb6160 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -223,6 +223,8 @@ class Notification(BaseModel): original_chat_id: Optional[str] = None # 是否禁用链接预览(仅Telegram支持) disable_web_page_preview: Optional[bool] = None + # Telegram 消息格式类型,默认 MarkdownV2,可传 HTML + parse_mode: Optional[str] = None # 是否写入消息历史 save_history: bool = True diff --git a/tests/test_telegram.py b/tests/test_telegram.py index 9f917811..9035a142 100644 --- a/tests/test_telegram.py +++ b/tests/test_telegram.py @@ -2,12 +2,17 @@ """ Telegram 模块单元测试(pytest 原生)。 """ +from types import SimpleNamespace +from unittest.mock import MagicMock, Mock, patch + import pytest -from unittest.mock import MagicMock, patch from app.core.context import MediaInfo, Context, TorrentInfo from app.core.metainfo import MetaInfo +from app.modules.telegram import TelegramModule from app.modules.telegram.telegram import Telegram +from app.schemas import Notification +from app.schemas.types import MessageChannel from app.schemas.types import MediaType @@ -243,6 +248,56 @@ def test_send_msg_markdown_escaping(telegram): # 验证返回值:send_msg 失败时返回 {"success": False}(非空字典),故显式断言 success assert result and result.get("success") + send_kwargs = telegram.bot.send_message.call_args.kwargs + assert send_kwargs["parse_mode"] == "MarkdownV2" + assert send_kwargs["text"].startswith("*测试标题*\n") + + +def test_send_msg_with_html_parse_mode_keeps_html(telegram): + """HTML模式发送时应保留调用方传入的HTML内容""" + result = telegram.send_msg( + title="测试 <标题>", + text="
第一行
加粗", + link="https://example.com/?a=1&b=2", + parse_mode="HTML", + ) + + assert result and result.get("success") + send_kwargs = telegram.bot.send_message.call_args.kwargs + assert send_kwargs["parse_mode"] == "HTML" + assert send_kwargs["text"] == ( + '测试 <标题>\n' + '
第一行
加粗\n' + '查看详情' + ) + + +def test_telegram_module_passes_parse_mode_to_client(): + """模块发送通知时应透传消息指定的parse_mode""" + module = TelegramModule() + client = Mock() + + with patch.object( + module, + "get_configs", + return_value={"telegram-test": SimpleNamespace(name="telegram-test")}, + ), patch.object( + module, "check_message", return_value=True + ), patch.object( + module, "get_instance", return_value=client + ): + module.post_message( + Notification( + channel=MessageChannel.Telegram, + source="telegram-test", + title="HTML", + text="正文", + parse_mode="HTML", + ) + ) + + client.send_msg.assert_called_once() + assert client.send_msg.call_args.kwargs["parse_mode"] == "HTML" def test_edit_msg_falls_back_to_caption_when_original_message_has_no_text(telegram): @@ -274,6 +329,22 @@ def test_edit_msg_falls_back_to_caption_when_original_message_has_no_text(telegr assert caption_kwargs["reply_markup"] is not None +def test_edit_msg_with_html_parse_mode_keeps_html(telegram): + """HTML模式编辑消息时应保留HTML内容""" + result = telegram.edit_msg( + chat_id="1051253579", + message_id="110502", + title="标题", + text="
请选择
", + parse_mode="HTML", + ) + + assert result is True + edit_kwargs = telegram.bot.edit_message_text.call_args.kwargs + assert edit_kwargs["parse_mode"] == "HTML" + assert edit_kwargs["text"] == "标题\n
请选择
" + + def test_edit_msg_keeps_other_edit_errors_failed(telegram): """非图片 caption 场景的编辑错误不应被错误标记为成功。""" telegram.bot.edit_message_text.side_effect = Exception(