Merge pull request #2946 from InfinityPacer/feature/push

This commit is contained in:
jxxghp
2024-10-26 13:40:38 +08:00
committed by GitHub
6 changed files with 315 additions and 132 deletions

View File

@@ -20,6 +20,7 @@ from app.schemas.event import CommandRegisterEventData
from app.schemas.types import EventType, MessageChannel, ChainEventType
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.structures import DictUtils
class CommandChain(ChainBase, metaclass=Singleton):
@@ -173,29 +174,40 @@ class CommandChain(ChainBase, metaclass=Singleton):
**self._other_commands
}
# 强制触发注册
force_register = False
# 触发事件允许可以拦截和调整命令
event, initial_commands = self.__trigger_register_commands_event()
# 如果事件返回有效的 event_data使用事件中调整后的命令
if event and event.event_data:
# 如果事件返回有效的 event_data使用事件中调整后的命令
event_data: CommandRegisterEventData = event.event_data
# 如果事件被取消,跳过命令注册
if event_data.cancel:
logger.debug(f"Command initialization canceled by event: {event_data.source}")
return
# 如果拦截源与插件标识一致时,这里认为需要强制触发注册
if pid is not None and pid == event_data.source:
force_register = True
initial_commands = event_data.commands or {}
logger.debug(f"Registering command count from event: {len(initial_commands)}")
else:
logger.debug(f"Registering initial command count: {len(initial_commands)}")
# initial_commands 必须是 self._commands 的子集
filtered_initial_commands = {
cmd: details for cmd, details in initial_commands.items() if cmd in self._commands
}
filtered_initial_commands = DictUtils.filter_keys_to_subset(initial_commands, self._commands)
# 如果 filtered_initial_commands 为空,则跳过注册
if not filtered_initial_commands and not force_register:
logger.debug("Filtered commands are empty, skipping registration.")
return
# 对比调整后的命令与当前命令
if filtered_initial_commands == self._registered_commands:
logger.debug("Command set unchanged, skipping broadcast registration.")
else:
logger.debug("Command set has changed, Updating and broadcasting new commands.")
if filtered_initial_commands != self._registered_commands or force_register:
logger.debug("Command set has changed or force registration is enabled.")
self._registered_commands = filtered_initial_commands
super().register_commands(commands=filtered_initial_commands)
else:
logger.debug("Command set unchanged, skipping broadcast registration.")
except Exception as e:
logger.error(f"Error occurred during command initialization in background: {e}", exc_info=True)
@@ -220,13 +232,16 @@ class CommandChain(ChainBase, metaclass=Singleton):
command_data["pid"] = plugin_id
commands[cmd] = command_data
# 触发事件允许可以拦截和调整命令
commands = {}
# 初始化命令字典
commands: Dict[str, dict] = {}
add_commands(self._preset_commands, "preset")
add_commands(self._plugin_commands, "plugin")
add_commands(self._other_commands, "other")
# 触发事件允许可以拦截和调整命令
event_data = CommandRegisterEventData(commands=commands, origin="CommandChain", service=None)
return eventmanager.send_event(ChainEventType.CommandRegister, event_data), commands
event = eventmanager.send_event(ChainEventType.CommandRegister, event_data)
return event, commands
def __build_plugin_commands(self, pid: Optional[str] = None) -> Dict[str, dict]:
"""

View File

@@ -1,12 +1,16 @@
import copy
import json
from typing import Optional, Union, List, Tuple, Any, Dict
from app.core.context import MediaInfo, Context
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.telegram.telegram import Telegram
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
from app.schemas.event import CommandRegisterEventData
from app.schemas.types import ModuleType, ChainEventType
from app.utils.structures import DictUtils
class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
@@ -189,5 +193,41 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
注册命令,实现这个函数接收系统可用的命令菜单
:param commands: 命令字典
"""
for client in self.get_instances().values():
client.register_commands(commands)
for client_config in self.get_configs().values():
client = self.get_instance(client_config.name)
if not client:
continue
# 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享
scoped_commands = copy.deepcopy(commands)
event = eventmanager.send_event(
ChainEventType.CommandRegister,
CommandRegisterEventData(commands=scoped_commands, origin="Telegram", service=client_config.name)
)
# 如果事件返回有效的 event_data使用事件中调整后的命令
if event and event.event_data:
event_data: CommandRegisterEventData = event.event_data
# 如果事件被取消,跳过命令注册,并清理菜单
if event_data.cancel:
client.delete_commands()
logger.debug(
f"Command registration for {client_config.name} canceled by event: {event_data.source}"
)
continue
scoped_commands = event_data.commands or {}
if not scoped_commands:
logger.debug("Filtered commands are empty, skipping registration.")
client.delete_commands()
# scoped_commands 必须是 commands 的子集
filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands)
# 如果 filtered_scoped_commands 为空,则跳过注册
if not filtered_scoped_commands:
logger.debug("Filtered commands are empty, skipping registration.")
client.delete_commands()
continue
# 对比调整后的命令与当前命令
if filtered_scoped_commands != commands:
logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}")
client.register_commands(filtered_scoped_commands)

View File

@@ -256,6 +256,15 @@ class Telegram:
]
)
def delete_commands(self):
"""
清理菜单命令
"""
if not self._bot:
return
# 清理菜单命令
self._bot.delete_my_commands()
def stop(self):
"""
停止Telegram消息接收服务

View File

@@ -1,14 +1,18 @@
import copy
import xml.dom.minidom
from typing import Optional, Union, List, Tuple, Any, Dict
from app.core.context import Context, MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.modules.wechat.wechat import WeChat
from app.schemas import MessageChannel, CommingMessage, Notification
from app.schemas.types import ModuleType
from app.schemas.event import CommandRegisterEventData
from app.schemas.types import ModuleType, ChainEventType
from app.utils.dom import DomUtils
from app.utils.structures import DictUtils
class WechatModule(_ModuleBase, _MessageBase[WeChat]):
@@ -222,7 +226,42 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
# 如果没有配置消息解密相关参数,则也没有必要进行菜单初始化
if not client_config.config.get("WECHAT_ENCODING_AESKEY") or not client_config.config.get("WECHAT_TOKEN"):
logger.debug(f"{client_config.name} 缺少消息解密参数,跳过后续菜单初始化")
else:
client = self.get_instance(client_config.name)
if client:
client.create_menus(commands)
continue
client = self.get_instance(client_config.name)
if not client:
continue
# 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享
scoped_commands = copy.deepcopy(commands)
event = eventmanager.send_event(
ChainEventType.CommandRegister,
CommandRegisterEventData(commands=scoped_commands, origin="WeChat", service=client_config.name)
)
# 如果事件返回有效的 event_data使用事件中调整后的命令
if event and event.event_data:
event_data: CommandRegisterEventData = event.event_data
# 如果事件被取消,跳过命令注册,并清理菜单
if event_data.cancel:
client.delete_menus()
logger.debug(
f"Command registration for {client_config.name} canceled by event: {event_data.source}"
)
continue
scoped_commands = event_data.commands or {}
if not scoped_commands:
logger.debug("Filtered commands are empty, skipping registration.")
client.delete_menus()
# scoped_commands 必须是 commands 的子集
filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands)
# 如果 filtered_scoped_commands 为空,则跳过注册
if not filtered_scoped_commands:
logger.debug("Filtered commands are empty, skipping registration.")
client.delete_menus()
continue
# 对比调整后的命令与当前命令
if filtered_scoped_commands != commands:
logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}")
client.create_menus(filtered_scoped_commands)

View File

@@ -252,51 +252,55 @@ class WeChat:
:param link: 跳转链接
:return: 发送状态,错误信息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if image:
ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link)
else:
ret_code = self.__send_message(title=title, text=text, userid=userid, link=link)
if image:
ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link)
else:
ret_code = self.__send_message(title=title, text=text, userid=userid, link=link)
return ret_code
return ret_code
except Exception as e:
logger.error(f"发送消息失败:{e}")
return False
def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional[bool]:
"""
发送列表类消息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if not userid:
userid = "@all"
articles = []
index = 1
for media in medias:
if media.vote_average:
title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}"
else:
title = f"{index}. {media.title_year}\n类型:{media.type.value}"
articles.append({
"title": title,
"description": "",
"picurl": media.get_message_image() if index == 1 else media.get_poster_image(),
"url": media.detail_link
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if not userid:
userid = "@all"
articles = []
index = 1
for media in medias:
if media.vote_average:
title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}"
else:
title = f"{index}. {media.title_year}\n类型:{media.type.value}"
articles.append({
"title": title,
"description": "",
"picurl": media.get_message_image() if index == 1 else media.get_poster_image(),
"url": media.detail_link
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
return self.__post_request(self._send_msg_url, req_json)
except Exception as e:
logger.error(f"发送消息失败:{e}")
@@ -307,49 +311,49 @@ class WeChat:
"""
发送列表消息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
# 先发送标题
if title:
self.__send_message(title=title, userid=userid, link=link)
# 发送列表
if not userid:
userid = "@all"
articles = []
index = 1
for context in torrents:
torrent = context.torrent_info
meta = MetaInfo(title=torrent.title, subtitle=torrent.description)
mediainfo = context.media_info
torrent_title = f"{index}.【{torrent.site_name}" \
f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group} " \
f"{StringUtils.str_filesize(torrent.size)} " \
f"{torrent.volume_factor} " \
f"{torrent.seeders}"
torrent_title = re.sub(r"\s+", " ", torrent_title).strip()
articles.append({
"title": torrent_title,
"description": torrent.description if index == 1 else "",
"picurl": mediainfo.get_message_image() if index == 1 else "",
"url": torrent.page_url
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
# 先发送标题
if title:
self.__send_message(title=title, userid=userid, link=link)
# 发送列表
if not userid:
userid = "@all"
articles = []
index = 1
for context in torrents:
torrent = context.torrent_info
meta = MetaInfo(title=torrent.title, subtitle=torrent.description)
mediainfo = context.media_info
torrent_title = f"{index}.【{torrent.site_name}" \
f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group} " \
f"{StringUtils.str_filesize(torrent.size)} " \
f"{torrent.volume_factor} " \
f"{torrent.seeders}"
torrent_title = re.sub(r"\s+", " ", torrent_title).strip()
articles.append({
"title": torrent_title,
"description": torrent.description if index == 1 else "",
"picurl": mediainfo.get_message_image() if index == 1 else "",
"url": torrent.page_url
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
return self.__post_request(self._send_msg_url, req_json)
except Exception as e:
logger.error(f"发送消息失败:{e}")
@@ -424,49 +428,53 @@ class WeChat:
]
}
"""
# 请求URL
req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid)
try:
# 请求URL
req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid)
# 对commands按category分组
category_dict = {}
for key, value in commands.items():
category: str = value.get("category")
if category:
if not category_dict.get(category):
category_dict[category] = {}
category_dict[category][key] = value
# 对commands按category分组
category_dict = {}
for key, value in commands.items():
category: str = value.get("category")
if category:
if not category_dict.get(category):
category_dict[category] = {}
category_dict[category][key] = value
# 一级菜单
buttons = []
for category, menu in category_dict.items():
# 二级菜单
sub_buttons = []
for key, value in menu.items():
sub_buttons.append({
"type": "click",
"name": value.get("description"),
"key": key
# 一级菜单
buttons = []
for category, menu in category_dict.items():
# 二级菜单
sub_buttons = []
for key, value in menu.items():
sub_buttons.append({
"type": "click",
"name": value.get("description"),
"key": key
})
buttons.append({
"name": category,
"sub_button": sub_buttons[:5]
})
buttons.append({
"name": category,
"sub_button": sub_buttons[:5]
})
if buttons:
# 发送请求
try:
if buttons:
# 发送请求
self.__post_request(req_url, {
"button": buttons[:3]
})
except Exception as e:
logger.error(f"创建菜单失败:{e}")
return False
except Exception as e:
logger.error(f"创建菜单失败:{e}")
return False
def delete_menus(self):
"""
删除微信菜单
"""
# 请求URL
req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 发送请求
RequestUtils().get(req_url)
try:
# 请求URL
req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 发送请求
RequestUtils().get(req_url)
except Exception as e:
logger.error(f"删除菜单失败:{e}")
return False

72
app/utils/structures.py Normal file
View File

@@ -0,0 +1,72 @@
from typing import Dict, List, Set, TypeVar, Any, Union
K = TypeVar("K")
V = TypeVar("V")
class DictUtils:
@staticmethod
def filter_keys_to_subset(source: Dict[K, V], reference: Dict[K, V]) -> Dict[K, V]:
"""
过滤 source 字典,使其键成为 reference 字典键的子集
:param source: 要被过滤的字典
:param reference: 参考字典,定义允许的键
:return: 过滤后的字典,只包含在 reference 中存在的键
"""
if not isinstance(source, dict) or not isinstance(reference, dict):
return {}
return {key: value for key, value in source.items() if key in reference}
@staticmethod
def is_keys_subset(source: Dict[K, V], reference: Dict[K, V]) -> bool:
"""
判断 source 字典的键是否为 reference 字典键的子集
:param source: 要检查的字典
:param reference: 参考字典
:return: 如果 source 的键是 reference 的键子集,则返回 True否则返回 False
"""
if not isinstance(source, dict) or not isinstance(reference, dict):
return False
return all(key in reference for key in source)
class ListUtils:
@staticmethod
def flatten(nested_list: Union[List[List[Any]], List[Any]]) -> List[Any]:
"""
将嵌套的列表展平成单个列表
:param nested_list: 嵌套的列表
:return: 展平后的列表
"""
if not isinstance(nested_list, list):
return []
# 检查是否嵌套,若不嵌套直接返回
if not any(isinstance(sublist, list) for sublist in nested_list):
return nested_list
return [item for sublist in nested_list if isinstance(sublist, list) for item in sublist]
class SetUtils:
@staticmethod
def flatten(nested_sets: Union[Set[Set[Any]], Set[Any]]) -> Set[Any]:
"""
将嵌套的集合展开为单个集合
:param nested_sets: 嵌套的集合
:return: 展开的集合
"""
if not isinstance(nested_sets, set):
return set()
# 检查是否嵌套,若不嵌套直接返回
if not any(isinstance(subset, set) for subset in nested_sets):
return nested_sets
return {item for subset in nested_sets if isinstance(subset, set) for item in subset}