Files
MoviePilot/app/modules/telegram/telegram.py
jxxghp 274fc2d74f v2.8.8
- 下载器支持配置路径映射
- 问题修复与细节优化
2025-12-10 14:33:13 +08:00

666 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import re
import threading
from typing import Optional, List, Dict, Callable
from urllib.parse import urljoin
from telebot import TeleBot, apihelper
from telebot.types import BotCommand, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto
from telegramify_markdown import standardize, telegramify
from telegramify_markdown.type import ContentTypes, SentType
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.helper.thread import ThreadHelper
from app.helper.image import ImageHelper
from app.log import logger
from app.utils.common import retry
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
class RetryException(Exception):
pass
class Telegram:
_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message?token={settings.API_TOKEN}"
_bot: TeleBot = None
_callback_handlers: Dict[str, Callable] = {} # 存储回调处理器
_user_chat_mapping: Dict[str, str] = {} # userid -> chat_id mapping for reply targeting
_bot_username: Optional[str] = None # Bot username for mention detection
def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs):
"""
初始化参数
"""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
logger.error("Telegram配置不完整")
return
# Token
self._telegram_token = TELEGRAM_TOKEN
# Chat Id
self._telegram_chat_id = TELEGRAM_CHAT_ID
# 初始化机器人
if self._telegram_token and self._telegram_chat_id:
# telegram bot api 地址格式https://api.telegram.org
if kwargs.get("API_URL"):
apihelper.API_URL = urljoin(kwargs["API_URL"], '/bot{0}/{1}')
apihelper.FILE_URL = urljoin(kwargs["API_URL"], '/file/bot{0}/{1}')
else:
apihelper.proxy = settings.PROXY
# bot
_bot = TeleBot(self._telegram_token, parse_mode="MarkdownV2")
# 记录句柄
self._bot = _bot
# 获取并存储bot用户名用于@检测
try:
bot_info = _bot.get_me()
self._bot_username = bot_info.username
logger.info(f"Telegram bot用户名: @{self._bot_username}")
except Exception as e:
logger.error(f"获取bot信息失败: {e}")
self._bot_username = None
# 标记渠道来源
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
@_bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
_bot.reply_to(message, "温馨提示:直接发送名称或`订阅`+名称,搜索或订阅电影、电视剧")
@_bot.message_handler(func=lambda message: True)
def echo_all(message):
# Update user-chat mapping when receiving messages
self._update_user_chat_mapping(message.from_user.id, message.chat.id)
# Check if we should process this message
if self._should_process_message(message):
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
@_bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
"""
处理按钮点击回调
"""
try:
# Update user-chat mapping for callbacks too
self._update_user_chat_mapping(call.from_user.id, call.message.chat.id)
# 解析回调数据
callback_data = call.data
user_id = str(call.from_user.id)
logger.info(f"收到按钮回调:{callback_data},用户:{user_id}")
# 发送回调数据给主程序处理
callback_json = {
"callback_query": {
"id": call.id,
"from": call.from_user.to_dict(),
"message": {
"message_id": call.message.message_id,
"chat": {
"id": call.message.chat.id,
}
},
"data": callback_data
}
}
# 先确认回调避免用户看到loading状态
_bot.answer_callback_query(call.id)
# 发送给主程序处理
RequestUtils(timeout=15).post_res(self._ds_url, json=callback_json)
except Exception as err:
logger.error(f"处理按钮回调失败:{str(err)}")
_bot.answer_callback_query(call.id, "处理失败,请重试")
def run_polling():
"""
定义线程函数来运行 infinity_polling
"""
try:
_bot.infinity_polling(long_polling_timeout=30, logger_level=None)
except Exception as err:
logger.error(f"Telegram消息接收服务异常{str(err)}")
# 启动线程来运行 infinity_polling
self._polling_thread = threading.Thread(target=run_polling, daemon=True)
self._polling_thread.start()
logger.info("Telegram消息接收服务启动")
@property
def bot_username(self) -> Optional[str]:
"""
获取Bot用户名
:return: Bot用户名或None
"""
return self._bot_username
def _update_user_chat_mapping(self, userid: int, chat_id: int) -> None:
"""
更新用户与聊天的映射关系
:param userid: 用户ID
:param chat_id: 聊天ID
"""
if userid and chat_id:
self._user_chat_mapping[str(userid)] = str(chat_id)
def _get_user_chat_id(self, userid: str) -> Optional[str]:
"""
获取用户对应的聊天ID
:param userid: 用户ID
:return: 聊天ID或None
"""
return self._user_chat_mapping.get(str(userid)) if userid else None
def _should_process_message(self, message) -> bool:
"""
判断是否应该处理这条消息
:param message: Telegram消息对象
:return: 是否处理
"""
# 私聊消息总是处理
if message.chat.type == 'private':
logger.debug(f"处理私聊消息:用户 {message.from_user.id}")
return True
# 群聊中的命令消息总是处理(以/开头)
if message.text and message.text.startswith('/'):
logger.debug(f"处理群聊命令消息:{message.text[:20]}...")
return True
# 群聊中检查是否@了机器人
if message.chat.type in ['group', 'supergroup']:
if not self._bot_username:
# 如果没有获取到bot用户名为了安全起见处理所有消息
logger.debug("未获取到bot用户名处理所有群聊消息")
return True
# 检查消息文本中是否包含@bot_username
if message.text and f"@{self._bot_username}" in message.text:
logger.debug(f"检测到@{self._bot_username},处理群聊消息")
return True
# 检查消息实体中是否有提及bot
if message.entities:
for entity in message.entities:
if entity.type == 'mention':
mention_text = message.text[entity.offset:entity.offset + entity.length]
if mention_text == f"@{self._bot_username}":
logger.debug(f"通过实体检测到@{self._bot_username},处理群聊消息")
return True
# 群聊中没有@机器人,不处理
logger.debug(f"群聊消息未@机器人,跳过处理:{message.text[:30] if message.text else 'No text'}...")
return False
# 其他类型的聊天默认处理
logger.debug(f"处理其他类型聊天消息:{message.chat.type}")
return True
def get_state(self) -> bool:
"""
获取状态
"""
return self._bot is not None
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None,
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送Telegram消息
:param title: 消息标题
:param text: 消息内容
:param image: 消息图片地址
:param userid: 用户ID如有则只发消息给该用户
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
if not title and not text:
logger.warn("标题和内容不能同时为空")
return False
try:
if title and text:
caption = f"**{title}**\n{text}"
elif title:
caption = f"**{title}**"
elif text:
caption = text
else:
caption = ""
if link:
caption = f"{caption}\n[查看详情]({link})"
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
def _determine_target_chat_id(self, userid: Optional[str] = None,
original_chat_id: Optional[str] = None) -> str:
"""
确定目标聊天ID使用用户映射确保回复到正确的聊天
:param userid: 用户ID
:param original_chat_id: 原消息的聊天ID
:return: 目标聊天ID
"""
# 1. 优先使用原消息的聊天ID (编辑消息场景)
if original_chat_id:
return original_chat_id
# 2. 如果有userid尝试从映射中获取用户的聊天ID
if userid:
mapped_chat_id = self._get_user_chat_id(userid)
if mapped_chat_id:
return mapped_chat_id
# 如果映射中没有回退到使用userid作为聊天ID (私聊场景)
return userid
# 3. 最后使用默认聊天ID
return self._telegram_chat_id
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None,
title: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[Dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送媒体列表消息
:param medias: 媒体信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
try:
index, image, caption = 1, "", "*%s*" % title
for media in medias:
if not image:
image = media.get_message_image()
if media.vote_average:
caption = "%s\n%s. [%s](%s)\n_%s%s_" % (caption,
index,
media.title_year,
media.detail_link,
f"类型:{media.type.value}",
f"评分:{media.vote_average}")
else:
caption = "%s\n%s. [%s](%s)\n_%s_" % (caption,
index,
media.title_year,
media.detail_link,
f"类型:{media.type.value}")
index += 1
if link:
caption = f"{caption}\n[查看详情]({link})"
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
def send_torrents_msg(self, torrents: List[Context],
userid: Optional[str] = None, title: Optional[str] = None,
link: Optional[str] = None, buttons: Optional[List[List[Dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送种子列表消息
:param torrents: 种子信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
try:
index, caption = 1, "*%s*" % title
image = torrents[0].media_info.get_message_image()
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
caption = f"{caption}\n{index}.【{site_name}】[{title}]({link}) " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}"
index += 1
if link:
caption = f"{caption}\n[查看详情]({link})"
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息(种子消息通常没有图片)
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
@staticmethod
def _create_inline_keyboard(buttons: List[List[Dict]]) -> InlineKeyboardMarkup:
"""
创建内联键盘
:param buttons: 按钮配置,格式:[[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
:return: InlineKeyboardMarkup对象
"""
keyboard = []
for row in buttons:
button_row = []
for button in row:
if "url" in button:
# URL按钮
btn = InlineKeyboardButton(text=button["text"], url=button["url"])
else:
# 回调按钮
btn = InlineKeyboardButton(text=button["text"], callback_data=button["callback_data"])
button_row.append(btn)
keyboard.append(button_row)
return InlineKeyboardMarkup(keyboard)
def answer_callback_query(self, callback_query_id: int, text: Optional[str] = None,
show_alert: bool = False) -> Optional[bool]:
"""
回应回调查询
"""
if not self._bot:
return None
try:
self._bot.answer_callback_query(callback_query_id, text=text, show_alert=show_alert)
return True
except Exception as e:
logger.error(f"回应回调查询失败:{str(e)}")
return False
def delete_msg(self, message_id: int, chat_id: Optional[int] = None) -> Optional[bool]:
"""
删除Telegram消息
:param message_id: 消息ID
:param chat_id: 聊天ID
:return: 删除是否成功
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
try:
# 确定要删除消息的聊天ID
if chat_id:
target_chat_id = chat_id
else:
target_chat_id = self._telegram_chat_id
# 删除消息
result = self._bot.delete_message(chat_id=target_chat_id, message_id=int(message_id))
if result:
logger.info(f"成功删除Telegram消息: chat_id={target_chat_id}, message_id={message_id}")
return True
else:
logger.error(f"删除Telegram消息失败: chat_id={target_chat_id}, message_id={message_id}")
return False
except Exception as e:
logger.error(f"删除Telegram消息异常: {str(e)}")
return False
def __edit_message(self, chat_id: str, message_id: int, text: str,
buttons: Optional[List[List[dict]]] = None,
image: Optional[str] = None) -> Optional[bool]:
"""
编辑已发送的消息
:param chat_id: 聊天ID
:param message_id: 消息ID
:param text: 新的消息内容
:param buttons: 按钮列表
:param image: 图片URL或路径
:return: 编辑是否成功
"""
if not self._bot:
return None
try:
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
if image:
# 如果有图片使用edit_message_media
media = InputMediaPhoto(media=image, caption=standardize(text), parse_mode="MarkdownV2")
self._bot.edit_message_media(
chat_id=chat_id,
message_id=message_id,
media=media,
reply_markup=reply_markup
)
else:
# 如果没有图片使用edit_message_text
self._bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=standardize(text),
parse_mode="MarkdownV2",
reply_markup=reply_markup
)
return True
except Exception as e:
logger.error(f"编辑消息失败:{str(e)}")
return False
def __send_request(self, userid: Optional[str] = None, image="", caption="",
reply_markup: Optional[InlineKeyboardMarkup] = None) -> bool:
"""
向Telegram发送报文
:param reply_markup: 内联键盘
"""
kwargs = {
'chat_id': userid or self._telegram_chat_id,
'parse_mode': "MarkdownV2",
'reply_markup': reply_markup
}
# 处理图片
image = self.__process_image(image)
try:
# 图片消息的标题长度限制为1024文本消息为4096
caption_limit = 1024 if image else 4096
if len(caption) < caption_limit:
ret = self.__send_short_message(image, caption, **kwargs)
else:
sent_idx = set()
ret = self.__send_long_message(image, caption, sent_idx, **kwargs)
return ret is not None
except Exception as e:
logger.error(f"发送Telegram消息失败: {e}")
return False
@staticmethod
def __process_image(image_url: Optional[str]) -> Optional[bytes]:
"""
处理图片URL获取图片内容
"""
if not image_url:
return None
image = ImageHelper().fetch_image(image_url)
if not image:
logger.warn(f"图片获取失败: {image_url},仅发送文本消息")
return image
@retry(RetryException, logger=logger)
def __send_short_message(self, image: Optional[bytes], caption: str, **kwargs):
"""
发送短消息
"""
try:
if image:
return self._bot.send_photo(
photo=image,
caption=standardize(caption),
**kwargs
)
else:
return self._bot.send_message(
text=standardize(caption),
**kwargs
)
except Exception:
raise RetryException(f"发送{'图片' if image else '文本'}消息失败")
@retry(RetryException, logger=logger)
def __send_long_message(self, image: Optional[bytes], caption: str, sent_idx: set, **kwargs):
"""
发送长消息
"""
try:
reply_markup = kwargs.pop("reply_markup", None)
boxs: SentType = ThreadHelper().submit(lambda x: asyncio.run(telegramify(x)), caption).result()
ret = None
for i, item in enumerate(boxs):
if i in sent_idx:
# 跳过已发送消息
continue
current_reply_markup = reply_markup if i == 0 else None
if item.content_type == ContentTypes.TEXT and (i != 0 or not image):
ret = self._bot.send_message(**kwargs,
text=item.content,
reply_markup=current_reply_markup
)
elif item.content_type == ContentTypes.PHOTO or (image and i == 0):
ret = self._bot.send_photo(**kwargs,
photo=(getattr(item, "file_name", ""),
getattr(item, "file_data", image)),
caption=getattr(item, "caption", item.content),
reply_markup=current_reply_markup
)
elif item.content_type == ContentTypes.FILE:
ret = self._bot.send_document(**kwargs,
document=(item.file_name, item.file_data),
caption=item.caption,
reply_markup=current_reply_markup
)
sent_idx.add(i)
return ret
except Exception as e:
try:
raise RetryException(f"消息 [{i + 1}/{len(boxs)}] 发送失败") from e
except NameError:
raise
def register_commands(self, commands: Dict[str, dict]):
"""
注册菜单命令
"""
if not self._bot:
return
# 设置bot命令
if commands:
self._bot.delete_my_commands()
self._bot.set_my_commands(
commands=[
BotCommand(cmd[1:], str(desc.get("description"))) for cmd, desc in
commands.items()
]
)
def delete_commands(self):
"""
清理菜单命令
"""
if not self._bot:
return
# 清理菜单命令
self._bot.delete_my_commands()
def stop(self):
"""
停止Telegram消息接收服务
"""
if self._bot:
self._bot.stop_polling()
self._polling_thread.join()
logger.info("Telegram消息接收服务已停止")