From 915388c109cbacee99de71414cebe6dcc948fb37 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sat, 26 Oct 2024 04:51:45 +0800 Subject: [PATCH] feat(commands): support sending CommandRegister events for clients --- app/chain/command.py | 37 +++++++++++----- app/modules/telegram/__init__.py | 46 ++++++++++++++++++-- app/modules/wechat/__init__.py | 49 +++++++++++++++++++--- app/utils/structures.py | 72 ++++++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 19 deletions(-) create mode 100644 app/utils/structures.py diff --git a/app/chain/command.py b/app/chain/command.py index 45003a76..ec5a79a8 100644 --- a/app/chain/command.py +++ b/app/chain/command.py @@ -20,6 +20,7 @@ from app.schemas.event import CommandRegisterEventData from app.schemas.types import EventType, MessageChannel, ChainEventType from app.utils.object import ObjectUtils from app.utils.singleton import Singleton +from app.utils.structures import DictUtils class CommandChain(ChainBase, metaclass=Singleton): @@ -173,29 +174,40 @@ class CommandChain(ChainBase, metaclass=Singleton): **self._other_commands } + # 强制触发注册 + force_register = False # 触发事件允许可以拦截和调整命令 event, initial_commands = self.__trigger_register_commands_event() - # 如果事件返回有效的 event_data,使用事件中调整后的命令 if event and event.event_data: + # 如果事件返回有效的 event_data,使用事件中调整后的命令 event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册 + if event_data.cancel: + logger.debug(f"Command initialization canceled by event: {event_data.source}") + return + # 如果拦截源与插件标识一致时,这里认为需要强制触发注册 + if pid is not None and pid == event_data.source: + force_register = True initial_commands = event_data.commands or {} logger.debug(f"Registering command count from event: {len(initial_commands)}") else: logger.debug(f"Registering initial command count: {len(initial_commands)}") # initial_commands 必须是 self._commands 的子集 - filtered_initial_commands = { - cmd: details for cmd, details in initial_commands.items() if cmd in self._commands - } + filtered_initial_commands = DictUtils.filter_keys_to_subset(initial_commands, self._commands) + # 如果 filtered_initial_commands 为空,则跳过注册 + if not filtered_initial_commands and not force_register: + logger.debug("Filtered commands are empty, skipping registration.") + return # 对比调整后的命令与当前命令 - if filtered_initial_commands == self._registered_commands: - logger.debug("Command set unchanged, skipping broadcast registration.") - else: - logger.debug("Command set has changed, Updating and broadcasting new commands.") + if filtered_initial_commands != self._registered_commands or force_register: + logger.debug("Command set has changed or force registration is enabled.") self._registered_commands = filtered_initial_commands super().register_commands(commands=filtered_initial_commands) + else: + logger.debug("Command set unchanged, skipping broadcast registration.") except Exception as e: logger.error(f"Error occurred during command initialization in background: {e}", exc_info=True) @@ -220,13 +232,16 @@ class CommandChain(ChainBase, metaclass=Singleton): command_data["pid"] = plugin_id commands[cmd] = command_data - # 触发事件允许可以拦截和调整命令 - commands = {} + # 初始化命令字典 + commands: Dict[str, dict] = {} add_commands(self._preset_commands, "preset") add_commands(self._plugin_commands, "plugin") add_commands(self._other_commands, "other") + + # 触发事件允许可以拦截和调整命令 event_data = CommandRegisterEventData(commands=commands, origin="CommandChain", service=None) - return eventmanager.send_event(ChainEventType.CommandRegister, event_data), commands + event = eventmanager.send_event(ChainEventType.CommandRegister, event_data) + return event, commands def __build_plugin_commands(self, pid: Optional[str] = None) -> Dict[str, dict]: """ diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py index 2e8c6b9f..a0e29c23 100644 --- a/app/modules/telegram/__init__.py +++ b/app/modules/telegram/__init__.py @@ -1,12 +1,16 @@ +import copy import json from typing import Optional, Union, List, Tuple, Any, Dict from app.core.context import MediaInfo, Context +from app.core.event import eventmanager from app.log import logger from app.modules import _ModuleBase, _MessageBase from app.modules.telegram.telegram import Telegram from app.schemas import MessageChannel, CommingMessage, Notification -from app.schemas.types import ModuleType +from app.schemas.event import CommandRegisterEventData +from app.schemas.types import ModuleType, ChainEventType +from app.utils.structures import DictUtils class TelegramModule(_ModuleBase, _MessageBase[Telegram]): @@ -189,5 +193,41 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): 注册命令,实现这个函数接收系统可用的命令菜单 :param commands: 命令字典 """ - for client in self.get_instances().values(): - client.register_commands(commands) + for client_config in self.get_configs().values(): + client = self.get_instance(client_config.name) + if not client: + continue + + # 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享 + scoped_commands = copy.deepcopy(commands) + event = eventmanager.send_event( + ChainEventType.CommandRegister, + CommandRegisterEventData(commands=scoped_commands, origin="Telegram", service=client_config.name) + ) + + # 如果事件返回有效的 event_data,使用事件中调整后的命令 + if event and event.event_data: + event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册,并清理菜单 + if event_data.cancel: + client.delete_commands() + logger.debug( + f"Command registration for {client_config.name} canceled by event: {event_data.source}" + ) + continue + scoped_commands = event_data.commands or {} + if not scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_commands() + + # scoped_commands 必须是 commands 的子集 + filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands) + # 如果 filtered_scoped_commands 为空,则跳过注册 + if not filtered_scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_commands() + continue + # 对比调整后的命令与当前命令 + if filtered_scoped_commands != commands: + logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}") + client.register_commands(filtered_scoped_commands) diff --git a/app/modules/wechat/__init__.py b/app/modules/wechat/__init__.py index 67690f84..eb721007 100644 --- a/app/modules/wechat/__init__.py +++ b/app/modules/wechat/__init__.py @@ -1,14 +1,18 @@ +import copy import xml.dom.minidom from typing import Optional, Union, List, Tuple, Any, Dict from app.core.context import Context, MediaInfo +from app.core.event import eventmanager from app.log import logger from app.modules import _ModuleBase, _MessageBase from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt from app.modules.wechat.wechat import WeChat from app.schemas import MessageChannel, CommingMessage, Notification -from app.schemas.types import ModuleType +from app.schemas.event import CommandRegisterEventData +from app.schemas.types import ModuleType, ChainEventType from app.utils.dom import DomUtils +from app.utils.structures import DictUtils class WechatModule(_ModuleBase, _MessageBase[WeChat]): @@ -222,7 +226,42 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): # 如果没有配置消息解密相关参数,则也没有必要进行菜单初始化 if not client_config.config.get("WECHAT_ENCODING_AESKEY") or not client_config.config.get("WECHAT_TOKEN"): logger.debug(f"{client_config.name} 缺少消息解密参数,跳过后续菜单初始化") - else: - client = self.get_instance(client_config.name) - if client: - client.create_menus(commands) + continue + + client = self.get_instance(client_config.name) + if not client: + continue + + # 触发事件,允许调整命令数据,这里需要进行深复制,避免实例共享 + scoped_commands = copy.deepcopy(commands) + event = eventmanager.send_event( + ChainEventType.CommandRegister, + CommandRegisterEventData(commands=scoped_commands, origin="WeChat", service=client_config.name) + ) + + # 如果事件返回有效的 event_data,使用事件中调整后的命令 + if event and event.event_data: + event_data: CommandRegisterEventData = event.event_data + # 如果事件被取消,跳过命令注册,并清理菜单 + if event_data.cancel: + client.delete_menus() + logger.debug( + f"Command registration for {client_config.name} canceled by event: {event_data.source}" + ) + continue + scoped_commands = event_data.commands or {} + if not scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_menus() + + # scoped_commands 必须是 commands 的子集 + filtered_scoped_commands = DictUtils.filter_keys_to_subset(scoped_commands, commands) + # 如果 filtered_scoped_commands 为空,则跳过注册 + if not filtered_scoped_commands: + logger.debug("Filtered commands are empty, skipping registration.") + client.delete_menus() + continue + # 对比调整后的命令与当前命令 + if filtered_scoped_commands != commands: + logger.debug(f"Command set has changed, Updating new commands: {filtered_scoped_commands}") + client.create_menus(filtered_scoped_commands) diff --git a/app/utils/structures.py b/app/utils/structures.py new file mode 100644 index 00000000..2e2fb651 --- /dev/null +++ b/app/utils/structures.py @@ -0,0 +1,72 @@ +from typing import Dict, List, Set, TypeVar, Any, Union + +K = TypeVar("K") +V = TypeVar("V") + + +class DictUtils: + @staticmethod + def filter_keys_to_subset(source: Dict[K, V], reference: Dict[K, V]) -> Dict[K, V]: + """ + 过滤 source 字典,使其键成为 reference 字典键的子集 + + :param source: 要被过滤的字典 + :param reference: 参考字典,定义允许的键 + :return: 过滤后的字典,只包含在 reference 中存在的键 + """ + if not isinstance(source, dict) or not isinstance(reference, dict): + return {} + + return {key: value for key, value in source.items() if key in reference} + + @staticmethod + def is_keys_subset(source: Dict[K, V], reference: Dict[K, V]) -> bool: + """ + 判断 source 字典的键是否为 reference 字典键的子集 + + :param source: 要检查的字典 + :param reference: 参考字典 + :return: 如果 source 的键是 reference 的键子集,则返回 True,否则返回 False + """ + if not isinstance(source, dict) or not isinstance(reference, dict): + return False + + return all(key in reference for key in source) + + +class ListUtils: + @staticmethod + def flatten(nested_list: Union[List[List[Any]], List[Any]]) -> List[Any]: + """ + 将嵌套的列表展平成单个列表 + + :param nested_list: 嵌套的列表 + :return: 展平后的列表 + """ + if not isinstance(nested_list, list): + return [] + + # 检查是否嵌套,若不嵌套直接返回 + if not any(isinstance(sublist, list) for sublist in nested_list): + return nested_list + + return [item for sublist in nested_list if isinstance(sublist, list) for item in sublist] + + +class SetUtils: + @staticmethod + def flatten(nested_sets: Union[Set[Set[Any]], Set[Any]]) -> Set[Any]: + """ + 将嵌套的集合展开为单个集合 + + :param nested_sets: 嵌套的集合 + :return: 展开的集合 + """ + if not isinstance(nested_sets, set): + return set() + + # 检查是否嵌套,若不嵌套直接返回 + if not any(isinstance(subset, set) for subset in nested_sets): + return nested_sets + + return {item for subset in nested_sets if isinstance(subset, set) for item in subset}