feat:Telegram、Slack 支持按钮

This commit is contained in:
jxxghp
2025-06-15 15:34:06 +08:00
parent c534e3dcb8
commit 95a827e8a2
6 changed files with 918 additions and 134 deletions

View File

@@ -1,6 +1,6 @@
import gc
import re
from typing import Any, Optional, Dict, Union
from typing import Any, Optional, Dict, Union, List
from app.chain import ChainBase
from app.chain.download import DownloadChain
@@ -14,6 +14,7 @@ from app.db.user_oper import UserOper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import Notification, NotExistMediaInfo, CommingMessage
from app.schemas.message import ChannelCapabilityManager
from app.schemas.types import EventType, MessageChannel, MediaType
from app.utils.string import StringUtils
@@ -145,7 +146,13 @@ class MessageChain(ChainBase):
action=0
)
# 处理消息
if text.startswith('/'):
if text.startswith('CALLBACK:'):
# 处理按钮回调(适配支持回调的渠道)
if ChannelCapabilityManager.supports_callbacks(channel):
self._handle_callback(text, channel, source, userid, username)
else:
logger.warning(f"渠道 {channel.value} 不支持回调,但收到了回调消息:{text}")
elif text.startswith('/'):
# 执行命令
self.eventmanager.send_event(
EventType.CommandExcute,
@@ -468,6 +475,147 @@ class MessageChain(ChainBase):
gc.collect()
def _handle_callback(self, text: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
"""
处理按钮回调
"""
# 提取回调数据
callback_data = text[9:] # 去掉 "CALLBACK:" 前缀
logger.info(f"处理按钮回调:{callback_data}")
# 解析回调数据
if callback_data.startswith("page_"):
# 翻页操作(旧格式,保持兼容)
self._handle_page_callback(callback_data, channel, source, userid)
elif callback_data.startswith("select_"):
# 选择操作或翻页操作
if callback_data in ["select_p", "select_n"]:
# 翻页操作:直接调用原来的文本处理逻辑
page_text = callback_data.split("_")[1] # 提取 "p" 或 "n"
self.handle_message(channel, source, userid, username, page_text)
else:
# 选择操作
self._handle_select_callback(callback_data, channel, source, userid, username)
elif callback_data.startswith("download_"):
# 下载操作
self._handle_download_callback(callback_data, channel, source, userid, username)
elif callback_data.startswith("subscribe_"):
# 订阅操作
self._handle_subscribe_callback(callback_data, channel, source, userid, username)
else:
# 其他自定义回调
logger.info(f"未知的回调数据:{callback_data}")
def handle_callback_message(self, coming_message: 'CommingMessage') -> None:
"""
处理带有回调信息的消息(新的增强接口)
"""
if not coming_message.is_callback or not coming_message.callback_data:
return
logger.info(f"处理回调消息:{coming_message.callback_data},用户:{coming_message.userid}")
# 加载缓存
user_cache: Dict[str, dict] = self.load_cache(self._cache_file) or {}
# 解析回调数据
callback_data = coming_message.callback_data
if callback_data.startswith("page_"):
# 翻页操作(旧格式,保持兼容)
self._handle_page_callback(callback_data, coming_message.channel,
coming_message.source, coming_message.userid)
elif callback_data.startswith("select_"):
# 选择操作或翻页操作
if callback_data in ["select_p", "select_n"]:
# 翻页操作:直接调用原来的文本处理逻辑
page_text = callback_data.split("_")[1] # 提取 "p" 或 "n"
self.handle_message(coming_message.channel, coming_message.source,
coming_message.userid, coming_message.username, page_text)
else:
# 选择操作
self._handle_select_callback(callback_data, coming_message.channel,
coming_message.source, coming_message.userid,
coming_message.username)
elif callback_data.startswith("download_"):
# 下载操作
self._handle_download_callback(callback_data, coming_message.channel,
coming_message.source, coming_message.userid,
coming_message.username)
elif callback_data.startswith("subscribe_"):
# 订阅操作
self._handle_subscribe_callback(callback_data, coming_message.channel,
coming_message.source, coming_message.userid,
coming_message.username)
else:
# 其他自定义回调
logger.info(f"未知的回调数据:{callback_data}")
# 保存缓存
self.save_cache(user_cache, self._cache_file)
def _handle_page_callback(self, callback_data: str, channel: MessageChannel, source: str,
userid: Union[str, int]) -> None:
"""
处理翻页回调
"""
try:
page = int(callback_data.split("_")[1])
# 获取当前页面
global _current_page
# 判断是上一页还是下一页
if page < _current_page:
# 上一页,调用原来的 "p" 逻辑
self.handle_message(channel, source, userid, "", "p")
elif page > _current_page:
# 下一页,调用原来的 "n" 逻辑
self.handle_message(channel, source, userid, "", "n")
# 如果 page == _current_page说明是当前页不需要处理
except (ValueError, IndexError) as e:
logger.error(f"处理翻页回调失败:{e}")
def _handle_select_callback(self, callback_data: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
"""
处理选择回调
"""
try:
index = int(callback_data.split("_")[1])
# 调用原有的数字选择逻辑
self.handle_message(channel, source, userid, username, str(index + 1))
except (ValueError, IndexError) as e:
logger.error(f"处理选择回调失败:{e}")
def _handle_download_callback(self, callback_data: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
"""
处理下载回调
"""
try:
if callback_data == "download_auto":
# 自动选择下载
self.handle_message(channel, source, userid, username, "0")
else:
index = int(callback_data.split("_")[1])
self.handle_message(channel, source, userid, username, str(index + 1))
except (ValueError, IndexError) as e:
logger.error(f"处理下载回调失败:{e}")
def _handle_subscribe_callback(self, callback_data: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str) -> None:
"""
处理订阅回调
"""
try:
index = int(callback_data.split("_")[1])
self.handle_message(channel, source, userid, username, str(index + 1))
except (ValueError, IndexError) as e:
logger.error(f"处理订阅回调失败:{e}")
def __auto_download(self, channel: MessageChannel, source: str, cache_list: list[Context],
userid: Union[str, int], username: str,
no_exists: Optional[Dict[Union[int, str], Dict[int, NotExistMediaInfo]]] = None):
@@ -521,35 +669,147 @@ class MessageChain(ChainBase):
note=note)
def __post_medias_message(self, channel: MessageChannel, source: str,
title: str, items: list, userid: str, total: int):
title: str, items: list, userid: str, total: int, current_page: int = 0):
"""
发送媒体列表消息
"""
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息请回复对应数字选择p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关信息,请回复对应数字选择"
self.post_medias_message(Notification(
channel=channel,
source=source,
title=title,
userid=userid
), medias=items)
# 检查渠道是否支持按钮
supports_buttons = ChannelCapabilityManager.supports_buttons(channel)
def __post_torrents_message(self, channel: MessageChannel, source: str,
title: str, items: list,
userid: str, total: int):
"""
发送种子列表消息
"""
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择 p: 上一页 n: 下一页)"
if supports_buttons:
# 支持按钮的渠道
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息,请选择操作"
else:
title = f"{title}】共找到{total}条相关信息,请选择操作"
buttons = self._create_media_buttons(channel, items, current_page, total)
else:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择)"
self.post_torrents_message(Notification(
# 不支持按钮的渠道,使用文本提示
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息请回复对应数字选择p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关信息,请回复对应数字选择"
buttons = None
notification = Notification(
channel=channel,
source=source,
title=title,
userid=userid,
link=settings.MP_DOMAIN('#/resource')
), torrents=items)
buttons=buttons
)
self.post_medias_message(notification, medias=items)
def _create_media_buttons(self, channel: MessageChannel, items: list,
current_page: int, total: int) -> List[List[Dict]]:
"""
创建媒体选择按钮
"""
buttons = []
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
# 为每个媒体项创建选择按钮
for i in range(len(items)):
media = items[i]
button_text = f"{i + 1}. {media.title_year}"
if len(button_text) > max_text_length:
button_text = button_text[:max_text_length - 3] + "..."
# 根据渠道配置决定按钮布局
if max_per_row == 1:
buttons.append([{"text": button_text, "callback_data": f"select_{current_page * self._page_size + i}"}])
else:
# 多按钮一行的情况,简化按钮文本
short_text = f"{i + 1}"
buttons.append([{"text": short_text, "callback_data": f"select_{current_page * self._page_size + i}"}])
# 添加翻页按钮
if total > self._page_size:
page_buttons = []
if current_page > 0:
page_buttons.append({"text": "⬅️ 上一页", "callback_data": "select_p"})
if (current_page + 1) * self._page_size < total:
page_buttons.append({"text": "下一页 ➡️", "callback_data": "select_n"})
if page_buttons:
buttons.append(page_buttons)
return buttons
def __post_torrents_message(self, channel: MessageChannel, source: str,
title: str, items: list,
userid: str, total: int, current_page: int = 0):
"""
发送种子列表消息
"""
# 检查渠道是否支持按钮
supports_buttons = ChannelCapabilityManager.supports_buttons(channel)
if supports_buttons:
# 支持按钮的渠道
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源,请选择下载"
else:
title = f"{title}】共找到{total}条相关资源,请选择下载"
buttons = self._create_torrent_buttons(channel, items, current_page, total)
else:
# 不支持按钮的渠道,使用文本提示
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择 p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择)"
buttons = None
notification = Notification(
channel=channel,
source=source,
title=title,
userid=userid,
link=settings.MP_DOMAIN('#/resource'),
buttons=buttons
)
self.post_torrents_message(notification, torrents=items)
def _create_torrent_buttons(self, channel: MessageChannel, items: list,
current_page: int, total: int) -> List[List[Dict]]:
"""
创建种子下载按钮
"""
buttons = []
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
# 自动选择按钮
buttons.append([{"text": "🤖 自动选择下载", "callback_data": "download_auto"}])
# 为每个种子项创建下载按钮
for i in range(len(items)):
context = items[i]
torrent = context.torrent_info
# 根据渠道配置调整按钮文本
if max_per_row == 1:
button_text = f"{i + 1}. {torrent.site_name} - {torrent.seeders}"
if len(button_text) > max_text_length:
button_text = button_text[:max_text_length - 3] + "..."
else:
# 多按钮一行的情况,使用简化文本
button_text = f"{i + 1}"
buttons.append([{"text": button_text, "callback_data": f"download_{current_page * self._page_size + i}"}])
# 添加翻页按钮
if total > self._page_size:
page_buttons = []
if current_page > 0:
page_buttons.append({"text": "⬅️ 上一页", "callback_data": "select_p"})
if (current_page + 1) * self._page_size < total:
page_buttons.append({"text": "下一页 ➡️", "callback_data": "select_n"})
if page_buttons:
buttons.append(page_buttons)
return buttons

View File

@@ -81,8 +81,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def message_parser(self, source: str, body: Any, form: Any,
args: Any) -> Optional[CommingMessage]:
def message_parser(self, source: str, body: Any, form: Any, args: Any) -> Optional[CommingMessage]:
"""
解析消息内容,返回字典,注意以下约定值:
userid: 用户ID
@@ -219,8 +218,23 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
username = msg_json.get("user")
elif msg_json.get("type") == "block_actions":
userid = msg_json.get("user", {}).get("id")
text = msg_json.get("actions")[0].get("value")
callback_data = msg_json.get("actions")[0].get("value")
# 使用CALLBACK前缀标识按钮回调
text = f"CALLBACK:{callback_data}"
username = msg_json.get("user", {}).get("name")
logger.info(f"收到来自 {client_config.name} 的Slack按钮回调"
f"userid={userid}, username={username}, callback_data={callback_data}")
# 创建包含回调信息的CommingMessage
return CommingMessage(
channel=MessageChannel.Slack,
source=client_config.name,
userid=userid,
username=username,
text=text,
is_callback=True,
callback_data=callback_data
)
elif msg_json.get("type") == "event_callback":
userid = msg_json.get('event', {}).get('user')
text = re.sub(r"<@[0-9A-Z]+>", "", msg_json.get("event", {}).get("text"), flags=re.IGNORECASE).strip()
@@ -259,7 +273,8 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
client: Slack = self.get_instance(conf.name)
if client:
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link)
image=message.image, userid=userid, link=message.link,
buttons=message.buttons)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -273,7 +288,8 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
continue
client: Slack = self.get_instance(conf.name)
if client:
client.send_medias_msg(title=message.title, medias=medias, userid=message.userid)
client.send_medias_msg(title=message.title, medias=medias, userid=message.userid,
buttons=message.buttons)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -288,4 +304,4 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
client: Slack = self.get_instance(conf.name)
if client:
client.send_torrents_msg(title=message.title, torrents=torrents,
userid=message.userid)
userid=message.userid, buttons=message.buttons)

View File

@@ -101,7 +101,9 @@ class Slack:
"""
return True if self._client else False
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None, link: Optional[str] = None, userid: Optional[str] = None):
def send_msg(self, title: str, text: Optional[str] = None,
image: Optional[str] = None, link: Optional[str] = None,
userid: Optional[str] = None, buttons: Optional[List[List[dict]]] = None):
"""
发送Telegram消息
:param title: 消息标题
@@ -109,7 +111,7 @@ class Slack:
:param image: 消息图片地址
:param link: 点击消息转转的URL
:param userid: 用户ID如有则只发消息给该用户
:user_id: 发送消息的目标用户ID为空则发给管理员
:param buttons: 消息按钮列表,格式为 [[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
"""
if not self._client:
return False, "消息客户端未就绪"
@@ -139,8 +141,42 @@ class Slack:
"image_url": f"{image}",
"alt_text": f"{title}"
}})
# 链接
if link:
# 自定义按钮
if buttons:
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
# URL按钮
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"url": button["url"],
"action_id": f"actionId-url-{len(elements)}"
})
else:
# 回调按钮
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
elif link:
# 默认链接按钮
blocks.append({
"type": "actions",
"elements": [
@@ -169,7 +205,8 @@ class Slack:
logger.error(f"Slack消息发送失败: {msg_e}")
return False, str(msg_e)
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None) -> Optional[bool]:
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None) -> Optional[bool]:
"""
发送列表类消息
"""
@@ -223,23 +260,60 @@ class Slack:
}
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
}
]
}
)
# 如果有自定义按钮,使用自定义按钮,否则使用默认选择按钮
if buttons:
# 使用自定义按钮通常来自MessageChain的智能生成
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"url": button["url"],
"action_id": f"actionId-url-{len(elements)}"
})
else:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
# 只为第一个媒体项添加按钮,避免重复
buttons = None
else:
# 使用默认选择按钮
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
}
]
}
)
index += 1
# 发送
result = self._client.chat_postMessage(
@@ -252,8 +326,8 @@ class Slack:
logger.error(f"Slack消息发送失败: {msg_e}")
return False
def send_torrents_msg(self, torrents: List[Context],
userid: Optional[str] = None, title: Optional[str] = None) -> Optional[bool]:
def send_torrents_msg(self, torrents: List[Context], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None) -> Optional[bool]:
"""
发送列表消息
"""
@@ -279,49 +353,113 @@ class Slack:
}]
# 列表
index = 1
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
# 如果有自定义按钮,先添加种子列表,然后添加统一的按钮
if buttons:
# 添加种子列表(不带单独的选择按钮)
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
}
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
)
index += 1
# 添加统一的自定义按钮
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"text": button["text"],
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
"url": button["url"],
"action_id": f"actionId-url-{len(elements)}"
})
else:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
else:
# 使用默认的每个种子单独按钮
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
]
}
)
index += 1
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
}
]
}
)
index += 1
# 发送
result = self._client.chat_postMessage(
channel=channel,

View File

@@ -9,7 +9,8 @@ from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.telegram.telegram import Telegram
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData, \
NotificationConf
from app.schemas.types import ModuleType, ChainEventType, SystemConfigKey, EventType
from app.utils.structures import DictUtils
@@ -98,6 +99,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
:return: 渠道、消息体
"""
"""
普通消息格式:
{
'update_id': ,
'message': {
@@ -119,6 +121,16 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
'text': ''
}
}
按钮回调格式:
{
'callback_query': {
'id': '',
'from': {...},
'message': {...},
'data': 'callback_data'
}
}
"""
# 获取服务配置
client_config = self.get_config(source)
@@ -130,32 +142,89 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
except Exception as err:
logger.debug(f"解析Telegram消息失败{str(err)}")
return None
if message:
text = message.get("text")
user_id = message.get("from", {}).get("id")
# 获取用户名
user_name = message.get("from", {}).get("username")
if text:
logger.info(f"收到来自 {client_config.name} 的Telegram消息"
f"userid={user_id}, username={user_name}, text={text}")
# 检查权限
admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if text.startswith("/"):
if admin_users \
and str(user_id) not in admin_users.split(',') \
and str(user_id) != chat_id:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
return None
else:
if user_list \
and not str(user_id) in user_list.split(','):
logger.info(f"用户{user_id}不在用户白名单中,无法使用此机器人")
client.send_msg(title="你不在用户白名单中,无法使用此机器人", userid=user_id)
return None
return CommingMessage(channel=MessageChannel.Telegram, source=client_config.name,
userid=user_id, username=user_name, text=text)
# 处理按钮回调
if "callback_query" in message:
return self._handle_callback_query(message, client_config)
# 处理普通消息
elif "message" in message:
return self._handle_text_message(message["message"], client_config, client)
return None
@staticmethod
def _handle_callback_query(message: dict, client_config: NotificationConf) -> Optional[CommingMessage]:
"""
处理按钮回调查询
"""
callback_query = message.get("callback_query", {})
user_info = callback_query.get("from", {})
callback_data = callback_query.get("data", "")
user_id = user_info.get("id")
user_name = user_info.get("username")
if callback_data and user_id:
logger.info(f"收到来自 {client_config.name} 的Telegram按钮回调"
f"userid={user_id}, username={user_name}, callback_data={callback_data}")
# 将callback_data作为特殊格式的text返回以便主程序识别这是按钮回调
callback_text = f"CALLBACK:{callback_data}"
# 创建包含完整回调信息的CommingMessage
return CommingMessage(
channel=MessageChannel.Telegram,
source=client_config.name,
userid=user_id,
username=user_name,
text=callback_text,
is_callback=True,
callback_data=callback_data,
message_id=callback_query.get("message", {}).get("message_id"),
chat_id=str(callback_query.get("message", {}).get("chat", {}).get("id", "")),
callback_query=callback_query
)
return None
@staticmethod
def _handle_text_message(msg: dict, client_config: NotificationConf, client: Telegram) -> Optional[CommingMessage]:
"""
处理普通文本消息
"""
text = msg.get("text")
user_id = msg.get("from", {}).get("id")
user_name = msg.get("from", {}).get("username")
if text and user_id:
logger.info(f"收到来自 {client_config.name} 的Telegram消息"
f"userid={user_id}, username={user_name}, text={text}")
# 检查权限
admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if text.startswith("/"):
if admin_users \
and str(user_id) not in admin_users.split(',') \
and str(user_id) != chat_id:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
return None
else:
if user_list \
and str(user_id) not in user_list.split(','):
logger.info(f"用户{user_id}不在用户白名单中,无法使用此机器人")
client.send_msg(title="你不在用户白名单中,无法使用此机器人", userid=user_id)
return None
return CommingMessage(
channel=MessageChannel.Telegram,
source=client_config.name,
userid=user_id,
username=user_name,
text=text
)
return None
def post_message(self, message: Notification) -> None:
@@ -177,7 +246,8 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link)
image=message.image, userid=userid, link=message.link,
buttons=message.buttons)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -192,7 +262,8 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_medias_msg(title=message.title, medias=medias,
userid=message.userid, link=message.link)
userid=message.userid, link=message.link,
buttons=message.buttons)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -207,7 +278,8 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_torrents_msg(title=message.title, torrents=torrents,
userid=message.userid, link=message.link)
userid=message.userid, link=message.link,
buttons=message.buttons)
def register_commands(self, commands: Dict[str, dict]):
"""

View File

@@ -3,12 +3,12 @@ import threading
import uuid
from pathlib import Path
from threading import Event
from typing import Optional, List, Dict
from typing import Optional, List, Dict, Callable
from urllib.parse import urljoin
import telebot
from telebot import apihelper
from telebot.types import InputFile
from telebot.types import InputFile, InlineKeyboardMarkup, InlineKeyboardButton
from app.core.config import settings
from app.core.context import MediaInfo, Context
@@ -23,6 +23,7 @@ class Telegram:
_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message?token={settings.API_TOKEN}"
_event = Event()
_bot: telebot.TeleBot = None
_callback_handlers: Dict[str, Callable] = {} # 存储回调处理器
def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs):
"""
@@ -57,7 +58,39 @@ class Telegram:
@_bot.message_handler(func=lambda message: True)
def echo_all(message):
RequestUtils(timeout=5).post_res(self._ds_url, json=message.json)
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
@_bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
"""
处理按钮点击回调
"""
try:
# 解析回调数据
callback_data = call.data
user_id = str(call.from_user.id)
logger.info(f"收到按钮回调:{callback_data},用户:{user_id}")
# 发送回调数据给主程序处理
callback_json = {
"callback_query": {
"id": call.id,
"from": call.from_user.to_dict(),
"message": call.message.to_dict(),
"data": callback_data
}
}
# 先确认回调避免用户看到loading状态
_bot.answer_callback_query(call.id)
# 发送给主程序处理
RequestUtils(timeout=15).post_res(self._ds_url, json=callback_json)
except Exception as e:
logger.error(f"处理按钮回调失败:{str(e)}")
_bot.answer_callback_query(call.id, "处理失败,请重试")
def run_polling():
"""
@@ -80,7 +113,8 @@ class Telegram:
return self._bot is not None
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None,
userid: Optional[str] = None, link: Optional[str] = None) -> Optional[bool]:
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None) -> Optional[bool]:
"""
发送Telegram消息
:param title: 消息标题
@@ -88,6 +122,7 @@ class Telegram:
:param image: 消息图片地址
:param userid: 用户ID如有则只发消息给该用户
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:userid: 发送消息的目标用户ID为空则发给管理员
"""
if not self._telegram_token or not self._telegram_chat_id:
@@ -113,16 +148,27 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
return self.__send_request(userid=chat_id, image=image, caption=caption)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None,
title: Optional[str] = None, link: Optional[str] = None) -> Optional[bool]:
title: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[Dict]]] = None) -> Optional[bool]:
"""
发送媒体列表消息
:param medias: 媒体信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
@@ -155,7 +201,12 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
return self.__send_request(userid=chat_id, image=image, caption=caption)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
@@ -163,9 +214,14 @@ class Telegram:
def send_torrents_msg(self, torrents: List[Context],
userid: Optional[str] = None, title: Optional[str] = None,
link: Optional[str] = None) -> Optional[bool]:
link: Optional[str] = None, buttons: Optional[List[List[Dict]]] = None) -> Optional[bool]:
"""
发送列表消息
:param torrents: Torrent信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
@@ -200,17 +256,61 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
return self.__send_request(userid=chat_id, caption=caption,
image=mediainfo.get_message_image())
image=mediainfo.get_message_image(), reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
@staticmethod
def _create_inline_keyboard(buttons: List[List[Dict]]) -> InlineKeyboardMarkup:
"""
创建内联键盘
:param buttons: 按钮配置,格式:[[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
:return: InlineKeyboardMarkup对象
"""
keyboard = []
for row in buttons:
button_row = []
for button in row:
if "url" in button:
# URL按钮
btn = InlineKeyboardButton(text=button["text"], url=button["url"])
else:
# 回调按钮
btn = InlineKeyboardButton(text=button["text"], callback_data=button["callback_data"])
button_row.append(btn)
keyboard.append(button_row)
return InlineKeyboardMarkup(keyboard)
def answer_callback_query(self, callback_query_id: int, text: Optional[str] = None,
show_alert: bool = False) -> Optional[bool]:
"""
回应回调查询
:param callback_query_id: 回调查询ID
:param text: 提示文本
:param show_alert: 是否显示弹窗提示
:return: 回应结果
"""
try:
self._bot.answer_callback_query(callback_query_id, text, show_alert)
return True
except Exception as e:
logger.error(f"回应回调查询失败:{str(e)}")
return False
@retry(Exception, logger=logger)
def __send_request(self, userid: Optional[str] = None, image="", caption="") -> bool:
def __send_request(self, userid: Optional[str] = None, image="", caption="",
reply_markup: Optional[InlineKeyboardMarkup] = None) -> bool:
"""
向Telegram发送报文
:param reply_markup: 内联键盘
"""
if image:
res = RequestUtils(proxies=settings.PROXY).get_res(image)
@@ -227,7 +327,8 @@ class Telegram:
ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id,
photo=photo,
caption=caption,
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise Exception("发送图片消息失败")
return True
@@ -237,11 +338,13 @@ class Telegram:
for i in range(0, len(caption), 4095):
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption[i:i + 4095],
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup if i == 0 else None)
else:
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption,
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise Exception("发送文本消息失败")
return True if ret else False

View File

@@ -1,4 +1,6 @@
from typing import Optional, Union
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union, List, Dict, Set
from pydantic import BaseModel, Field
@@ -23,6 +25,16 @@ class CommingMessage(BaseModel):
date: Optional[str] = None
# 消息方向
action: Optional[int] = 0
# 是否为回调消息
is_callback: Optional[bool] = False
# 回调数据
callback_data: Optional[str] = None
# 消息ID用于回调时定位原消息
message_id: Optional[int] = None
# 聊天ID用于回调时定位聊天
chat_id: Optional[str] = None
# 完整的回调查询信息(原始数据)
callback_query: Optional[Dict] = None
def to_dict(self):
"""
@@ -65,6 +77,8 @@ class Notification(BaseModel):
action: Optional[int] = 1
# 消息目标用户ID字典未指定用户ID时使用
targets: Optional[dict] = None
# 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
buttons: Optional[List[List[dict]]] = None
def to_dict(self):
"""
@@ -115,3 +129,184 @@ class SubscriptionMessage(BaseModel):
icon: Optional[str] = None
url: Optional[str] = None
data: Optional[dict] = Field(default_factory=dict)
class ChannelCapability(Enum):
"""
渠道能力枚举
"""
# 支持内联按钮
INLINE_BUTTONS = "inline_buttons"
# 支持菜单命令
MENU_COMMANDS = "menu_commands"
# 支持消息编辑
MESSAGE_EDITING = "message_editing"
# 支持回调查询
CALLBACK_QUERIES = "callback_queries"
# 支持富文本
RICH_TEXT = "rich_text"
# 支持图片
IMAGES = "images"
# 支持链接
LINKS = "links"
# 支持文件发送
FILE_SENDING = "file_sending"
@dataclass
class ChannelCapabilities:
"""
渠道能力配置
"""
channel: MessageChannel
capabilities: Set[ChannelCapability]
max_buttons_per_row: int = 2
max_button_rows: int = 10
max_button_text_length: int = 30
fallback_enabled: bool = True
class ChannelCapabilityManager:
"""
渠道能力管理器
"""
_capabilities: Dict[MessageChannel, ChannelCapabilities] = {
MessageChannel.Telegram: ChannelCapabilities(
channel=MessageChannel.Telegram,
capabilities={
ChannelCapability.INLINE_BUTTONS,
ChannelCapability.MENU_COMMANDS,
ChannelCapability.MESSAGE_EDITING,
ChannelCapability.CALLBACK_QUERIES,
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.FILE_SENDING
},
max_buttons_per_row=2,
max_button_rows=10,
max_button_text_length=30
),
MessageChannel.Wechat: ChannelCapabilities(
channel=MessageChannel.Wechat,
capabilities={
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.MENU_COMMANDS
},
fallback_enabled=True
),
MessageChannel.Slack: ChannelCapabilities(
channel=MessageChannel.Slack,
capabilities={
ChannelCapability.INLINE_BUTTONS,
ChannelCapability.CALLBACK_QUERIES,
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.MENU_COMMANDS
},
max_buttons_per_row=3,
max_button_rows=8,
max_button_text_length=25,
fallback_enabled=True
),
MessageChannel.SynologyChat: ChannelCapabilities(
channel=MessageChannel.SynologyChat,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.VoceChat: ChannelCapabilities(
channel=MessageChannel.VoceChat,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.WebPush: ChannelCapabilities(
channel=MessageChannel.WebPush,
capabilities={
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.Web: ChannelCapabilities(
channel=MessageChannel.Web,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
)
}
@classmethod
def get_capabilities(cls, channel: MessageChannel) -> Optional[ChannelCapabilities]:
"""
获取渠道能力
"""
return cls._capabilities.get(channel)
@classmethod
def supports_capability(cls, channel: MessageChannel, capability: ChannelCapability) -> bool:
"""
检查渠道是否支持某项能力
"""
channel_caps = cls.get_capabilities(channel)
if not channel_caps:
return False
return capability in channel_caps.capabilities
@classmethod
def supports_buttons(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持按钮
"""
return cls.supports_capability(channel, ChannelCapability.INLINE_BUTTONS)
@classmethod
def supports_callbacks(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持回调
"""
return cls.supports_capability(channel, ChannelCapability.CALLBACK_QUERIES)
@classmethod
def get_max_buttons_per_row(cls, channel: MessageChannel) -> int:
"""
获取每行最大按钮数
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_buttons_per_row if channel_caps else 1
@classmethod
def get_max_button_rows(cls, channel: MessageChannel) -> int:
"""
获取最大按钮行数
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_button_rows if channel_caps else 5
@classmethod
def get_max_button_text_length(cls, channel: MessageChannel) -> int:
"""
获取按钮文本最大长度
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_button_text_length if channel_caps else 20
@classmethod
def should_use_fallback(cls, channel: MessageChannel) -> bool:
"""
是否应该使用降级策略
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.fallback_enabled if channel_caps else True