diff --git a/app/chain/command.py b/app/chain/command.py index 45003a76..ec5a79a8 100644 --- a/app/chain/command.py +++ b/app/chain/command.py @@ -20,6 +20,7 @@ from app.schemas.event import CommandRegisterEventData from app.schemas.types import EventType, MessageChannel, ChainEventType from app.utils.object import ObjectUtils from app.utils.singleton import Singleton +from app.utils.structures import DictUtils class CommandChain(ChainBase, metaclass=Singleton): @@ -173,29 +174,40 @@ class CommandChain(ChainBase, metaclass=Singleton): **self._other_commands } + # 强制触发注册 + force_register = False # 触发事件允许可以拦截和调整命令 event, initial_commands = self.__trigger_register_commands_event() - # 如果事件返回有效的 event_data,使用事件中调整后的命令 if event and event.event_data: + # 如果事件返回有效的 event_data,使用事件中调整后的命令 event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册 + if event_data.cancel: + logger.debug(f"Command initialization canceled by event: {event_data.source}") + return + # 如果拦截源与插件标识一致时,这里认为需要强制触发注册 + if pid is not None and pid == event_data.source: + force_register = True initial_commands = event_data.commands or {} logger.debug(f"Registering command count from event: {len(initial_commands)}") else: logger.debug(f"Registering initial command count: {len(initial_commands)}") # initial_commands 必须是 self._commands 的子集 - filtered_initial_commands = { - cmd: details for cmd, details in initial_commands.items() if cmd in self._commands - } + filtered_initial_commands = DictUtils.filter_keys_to_subset(initial_commands, self._commands) + # 如果 filtered_initial_commands 为空,则跳过注册 + if not filtered_initial_commands and not force_register: + logger.debug("Filtered commands are empty, skipping registration.") + return # 对比调整后的命令与当前命令 - if filtered_initial_commands == self._registered_commands: - logger.debug("Command set unchanged, skipping broadcast registration.") - else: - logger.debug("Command set has changed, Updating and broadcasting new commands.") + if filtered_initial_commands != self._registered_commands or force_register: + logger.debug("Command set has changed or force registration is enabled.") self._registered_commands = filtered_initial_commands super().register_commands(commands=filtered_initial_commands) + else: + logger.debug("Command set unchanged, skipping broadcast registration.") except Exception as e: logger.error(f"Error occurred during command initialization in background: {e}", exc_info=True) @@ -220,13 +232,16 @@ class CommandChain(ChainBase, metaclass=Singleton): command_data["pid"] = plugin_id commands[cmd] = command_data - # 触发事件允许可以拦截和调整命令 - commands = {} + # 初始化命令字典 + commands: Dict[str, dict] = {} add_commands(self._preset_commands, "preset") add_commands(self._plugin_commands, "plugin") add_commands(self._other_commands, "other") + + # 触发事件允许可以拦截和调整命令 event_data = CommandRegisterEventData(commands=commands, origin="CommandChain", service=None) - return eventmanager.send_event(ChainEventType.CommandRegister, event_data), commands + event = eventmanager.send_event(ChainEventType.CommandRegister, event_data) + return event, commands def __build_plugin_commands(self, pid: Optional[str] = None) -> Dict[str, dict]: """ diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py index 2e8c6b9f..a0e29c23 100644 --- a/app/modules/telegram/__init__.py +++ b/app/modules/telegram/__init__.py @@ -1,12 +1,16 @@ +import copy import json from typing import Optional, Union, List, Tuple, Any, Dict from app.core.context import MediaInfo, Context +from app.core.event import eventmanager from app.log import logger from app.modules import _ModuleBase, _MessageBase from app.modules.telegram.telegram import Telegram from app.schemas import MessageChannel, CommingMessage, Notification -from app.schemas.types import ModuleType +from app.schemas.event import CommandRegisterEventData +from app.schemas.types import ModuleType, ChainEventType +from app.utils.structures import DictUtils class TelegramModule(_ModuleBase, _MessageBase[Telegram]): @@ -189,5 +193,41 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): 注册命令,实现这个函数接收系统可用的命令菜单 :param commands: 命令字典 """ - for client in self.get_instances().values(): - client.register_commands(commands) + for client_config in self.get_configs().values(): + client = self.get_instance(client_config.name) + if not client: + continue + + # 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享 + scoped_commands = copy.deepcopy(commands) + event = eventmanager.send_event( + ChainEventType.CommandRegister, + CommandRegisterEventData(commands=scoped_commands, origin="Telegram", service=client_config.name) + ) + + # 如果事件返回有效的 event_data,使用事件中调整后的命令 + if event and event.event_data: + event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册,并清理菜单 + if event_data.cancel: + client.delete_commands() + logger.debug( + f"Command registration for {client_config.name} canceled by event: {event_data.source}" + ) + continue + scoped_commands = event_data.commands or {} + if not scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_commands() + + # scoped_commands 必须是 commands 的子集 + filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands) + # 如果 filtered_scoped_commands 为空,则跳过注册 + if not filtered_scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_commands() + continue + # 对比调整后的命令与当前命令 + if filtered_scoped_commands != commands: + logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}") + client.register_commands(filtered_scoped_commands) diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 3652840b..734eda34 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -256,6 +256,15 @@ class Telegram: ] ) + def delete_commands(self): + """ + 清理菜单命令 + """ + if not self._bot: + return + # 清理菜单命令 + self._bot.delete_my_commands() + def stop(self): """ 停止Telegram消息接收服务 diff --git a/app/modules/wechat/__init__.py b/app/modules/wechat/__init__.py index 67690f84..eb721007 100644 --- a/app/modules/wechat/__init__.py +++ b/app/modules/wechat/__init__.py @@ -1,14 +1,18 @@ +import copy import xml.dom.minidom from typing import Optional, Union, List, Tuple, Any, Dict from app.core.context import Context, MediaInfo +from app.core.event import eventmanager from app.log import logger from app.modules import _ModuleBase, _MessageBase from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt from app.modules.wechat.wechat import WeChat from app.schemas import MessageChannel, CommingMessage, Notification -from app.schemas.types import ModuleType +from app.schemas.event import CommandRegisterEventData +from app.schemas.types import ModuleType, ChainEventType from app.utils.dom import DomUtils +from app.utils.structures import DictUtils class WechatModule(_ModuleBase, _MessageBase[WeChat]): @@ -222,7 +226,42 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): # 如果没有配置消息解密相关参数,则也没有必要进行菜单初始化 if not client_config.config.get("WECHAT_ENCODING_AESKEY") or not client_config.config.get("WECHAT_TOKEN"): logger.debug(f"{client_config.name} 缺少消息解密参数,跳过后续菜单初始化") - else: - client = self.get_instance(client_config.name) - if client: - client.create_menus(commands) + continue + + client = self.get_instance(client_config.name) + if not client: + continue + + # 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享 + scoped_commands = copy.deepcopy(commands) + event = eventmanager.send_event( + ChainEventType.CommandRegister, + CommandRegisterEventData(commands=scoped_commands, origin="WeChat", service=client_config.name) + ) + + # 如果事件返回有效的 event_data,使用事件中调整后的命令 + if event and event.event_data: + event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册,并清理菜单 + if event_data.cancel: + client.delete_menus() + logger.debug( + f"Command registration for {client_config.name} canceled by event: {event_data.source}" + ) + continue + scoped_commands = event_data.commands or {} + if not scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_menus() + + # scoped_commands 必须是 commands 的子集 + filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands) + # 如果 filtered_scoped_commands 为空,则跳过注册 + if not filtered_scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_menus() + continue + # 对比调整后的命令与当前命令 + if filtered_scoped_commands != commands: + logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}") + client.create_menus(filtered_scoped_commands) diff --git a/app/modules/wechat/wechat.py b/app/modules/wechat/wechat.py index 89d13507..e93a44a3 100644 --- a/app/modules/wechat/wechat.py +++ b/app/modules/wechat/wechat.py @@ -252,51 +252,55 @@ class WeChat: :param link: 跳转链接 :return: 发送状态,错误信息 """ - if not self.__get_access_token(): - logger.error("获取微信access_token失败,请检查参数配置") - return None + try: + if not self.__get_access_token(): + logger.error("获取微信access_token失败,请检查参数配置") + return None - if image: - ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link) - else: - ret_code = self.__send_message(title=title, text=text, userid=userid, link=link) + if image: + ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link) + else: + ret_code = self.__send_message(title=title, text=text, userid=userid, link=link) - return ret_code + return ret_code + except Exception as e: + logger.error(f"发送消息失败:{e}") + return False def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional[bool]: """ 发送列表类消息 """ - if not self.__get_access_token(): - logger.error("获取微信access_token失败,请检查参数配置") - return None - - if not userid: - userid = "@all" - articles = [] - index = 1 - for media in medias: - if media.vote_average: - title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}" - else: - title = f"{index}. {media.title_year}\n类型:{media.type.value}" - articles.append({ - "title": title, - "description": "", - "picurl": media.get_message_image() if index == 1 else media.get_poster_image(), - "url": media.detail_link - }) - index += 1 - - req_json = { - "touser": userid, - "msgtype": "news", - "agentid": self._appid, - "news": { - "articles": articles - } - } try: + if not self.__get_access_token(): + logger.error("获取微信access_token失败,请检查参数配置") + return None + + if not userid: + userid = "@all" + articles = [] + index = 1 + for media in medias: + if media.vote_average: + title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}" + else: + title = f"{index}. {media.title_year}\n类型:{media.type.value}" + articles.append({ + "title": title, + "description": "", + "picurl": media.get_message_image() if index == 1 else media.get_poster_image(), + "url": media.detail_link + }) + index += 1 + + req_json = { + "touser": userid, + "msgtype": "news", + "agentid": self._appid, + "news": { + "articles": articles + } + } return self.__post_request(self._send_msg_url, req_json) except Exception as e: logger.error(f"发送消息失败:{e}") @@ -307,49 +311,49 @@ class WeChat: """ 发送列表消息 """ - if not self.__get_access_token(): - logger.error("获取微信access_token失败,请检查参数配置") - return None - - # 先发送标题 - if title: - self.__send_message(title=title, userid=userid, link=link) - - # 发送列表 - if not userid: - userid = "@all" - articles = [] - index = 1 - for context in torrents: - torrent = context.torrent_info - meta = MetaInfo(title=torrent.title, subtitle=torrent.description) - mediainfo = context.media_info - torrent_title = f"{index}.【{torrent.site_name}】" \ - f"{meta.season_episode} " \ - f"{meta.resource_term} " \ - f"{meta.video_term} " \ - f"{meta.release_group} " \ - f"{StringUtils.str_filesize(torrent.size)} " \ - f"{torrent.volume_factor} " \ - f"{torrent.seeders}↑" - torrent_title = re.sub(r"\s+", " ", torrent_title).strip() - articles.append({ - "title": torrent_title, - "description": torrent.description if index == 1 else "", - "picurl": mediainfo.get_message_image() if index == 1 else "", - "url": torrent.page_url - }) - index += 1 - - req_json = { - "touser": userid, - "msgtype": "news", - "agentid": self._appid, - "news": { - "articles": articles - } - } try: + if not self.__get_access_token(): + logger.error("获取微信access_token失败,请检查参数配置") + return None + + # 先发送标题 + if title: + self.__send_message(title=title, userid=userid, link=link) + + # 发送列表 + if not userid: + userid = "@all" + articles = [] + index = 1 + for context in torrents: + torrent = context.torrent_info + meta = MetaInfo(title=torrent.title, subtitle=torrent.description) + mediainfo = context.media_info + torrent_title = f"{index}.【{torrent.site_name}】" \ + f"{meta.season_episode} " \ + f"{meta.resource_term} " \ + f"{meta.video_term} " \ + f"{meta.release_group} " \ + f"{StringUtils.str_filesize(torrent.size)} " \ + f"{torrent.volume_factor} " \ + f"{torrent.seeders}↑" + torrent_title = re.sub(r"\s+", " ", torrent_title).strip() + articles.append({ + "title": torrent_title, + "description": torrent.description if index == 1 else "", + "picurl": mediainfo.get_message_image() if index == 1 else "", + "url": torrent.page_url + }) + index += 1 + + req_json = { + "touser": userid, + "msgtype": "news", + "agentid": self._appid, + "news": { + "articles": articles + } + } return self.__post_request(self._send_msg_url, req_json) except Exception as e: logger.error(f"发送消息失败:{e}") @@ -424,49 +428,53 @@ class WeChat: ] } """ - # 请求URL - req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid) + try: + # 请求URL + req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid) - # 对commands按category分组 - category_dict = {} - for key, value in commands.items(): - category: str = value.get("category") - if category: - if not category_dict.get(category): - category_dict[category] = {} - category_dict[category][key] = value + # 对commands按category分组 + category_dict = {} + for key, value in commands.items(): + category: str = value.get("category") + if category: + if not category_dict.get(category): + category_dict[category] = {} + category_dict[category][key] = value - # 一级菜单 - buttons = [] - for category, menu in category_dict.items(): - # 二级菜单 - sub_buttons = [] - for key, value in menu.items(): - sub_buttons.append({ - "type": "click", - "name": value.get("description"), - "key": key + # 一级菜单 + buttons = [] + for category, menu in category_dict.items(): + # 二级菜单 + sub_buttons = [] + for key, value in menu.items(): + sub_buttons.append({ + "type": "click", + "name": value.get("description"), + "key": key + }) + buttons.append({ + "name": category, + "sub_button": sub_buttons[:5] }) - buttons.append({ - "name": category, - "sub_button": sub_buttons[:5] - }) - if buttons: - # 发送请求 - try: + if buttons: + # 发送请求 self.__post_request(req_url, { "button": buttons[:3] }) - except Exception as e: - logger.error(f"创建菜单失败:{e}") - return False + except Exception as e: + logger.error(f"创建菜单失败:{e}") + return False def delete_menus(self): """ 删除微信菜单 """ - # 请求URL - req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) - # 发送请求 - RequestUtils().get(req_url) + try: + # 请求URL + req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) + # 发送请求 + RequestUtils().get(req_url) + except Exception as e: + logger.error(f"删除菜单失败:{e}") + return False diff --git a/app/utils/structures.py b/app/utils/structures.py new file mode 100644 index 00000000..2e2fb651 --- /dev/null +++ b/app/utils/structures.py @@ -0,0 +1,72 @@ +from typing import Dict, List, Set, TypeVar, Any, Union + +K = TypeVar("K") +V = TypeVar("V") + + +class DictUtils: + @staticmethod + def filter_keys_to_subset(source: Dict[K, V], reference: Dict[K, V]) -> Dict[K, V]: + """ + 过滤 source 字典,使其键成为 reference 字典键的子集 + + :param source: 要被过滤的字典 + :param reference: 参考字典,定义允许的键 + :return: 过滤后的字典,只包含在 reference 中存在的键 + """ + if not isinstance(source, dict) or not isinstance(reference, dict): + return {} + + return {key: value for key, value in source.items() if key in reference} + + @staticmethod + def is_keys_subset(source: Dict[K, V], reference: Dict[K, V]) -> bool: + """ + 判断 source 字典的键是否为 reference 字典键的子集 + + :param source: 要检查的字典 + :param reference: 参考字典 + :return: 如果 source 的键是 reference 的键子集,则返回 True,否则返回 False + """ + if not isinstance(source, dict) or not isinstance(reference, dict): + return False + + return all(key in reference for key in source) + + +class ListUtils: + @staticmethod + def flatten(nested_list: Union[List[List[Any]], List[Any]]) -> List[Any]: + """ + 将嵌套的列表展平成单个列表 + + :param nested_list: 嵌套的列表 + :return: 展平后的列表 + """ + if not isinstance(nested_list, list): + return [] + + # 检查是否嵌套,若不嵌套直接返回 + if not any(isinstance(sublist, list) for sublist in nested_list): + return nested_list + + return [item for sublist in nested_list if isinstance(sublist, list) for item in sublist] + + +class SetUtils: + @staticmethod + def flatten(nested_sets: Union[Set[Set[Any]], Set[Any]]) -> Set[Any]: + """ + 将嵌套的集合展开为单个集合 + + :param nested_sets: 嵌套的集合 + :return: 展开的集合 + """ + if not isinstance(nested_sets, set): + return set() + + # 检查是否嵌套,若不嵌套直接返回 + if not any(isinstance(subset, set) for subset in nested_sets): + return nested_sets + + return {item for subset in nested_sets if isinstance(subset, set) for item in subset}