diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index 761230e3..eef35627 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -8,8 +8,11 @@ from pydantic import PrivateAttr from app.agent import StreamingHandler from app.chain import ChainBase from app.core.config import settings +from app.db.user_oper import UserOper +from app.helper.service import ServiceConfigHelper from app.log import logger from app.schemas import Notification +from app.schemas.types import MessageChannel class ToolChain(ChainBase): @@ -43,8 +46,13 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): 2. 持久化工具调用记录到会话记忆 3. 调用具体工具逻辑(子类实现的 execute 方法) 4. 持久化工具结果到会话记忆 + 5. 权限检查 """ + permission_result = await self._check_permission() + if permission_result: + return permission_result + # 获取工具执行提示消息 tool_message = self.get_tool_message(**kwargs) if not tool_message: @@ -132,6 +140,109 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): """ self._stream_handler = stream_handler + async def _check_permission(self) -> Optional[str]: + """ + 检查用户权限: + 1. 首先检查用户是否是渠道管理员 + 2. 如果渠道没有设置管理员名单,则检查用户是否是系统管理员 + 3. 如果都不是系统管理员,检查用户ID是否等于渠道配置的用户ID + 4. 如果都不是,返回权限拒绝消息 + """ + if not self._channel or not self._source: + return None + + user_id_str = str(self._user_id) if self._user_id else None + + channel_type_map = { + MessageChannel.Telegram: "telegram", + MessageChannel.Discord: "discord", + MessageChannel.Wechat: "wechat", + MessageChannel.Slack: "slack", + MessageChannel.VoceChat: "vocechat", + MessageChannel.SynologyChat: "synologychat", + MessageChannel.QQBot: "qqbot", + } + + channel_type = None + for key, value in channel_type_map.items(): + if self._channel == key.value: + channel_type = value + break + + if not channel_type: + return None + + admin_key_map = { + "telegram": "TELEGRAM_ADMINS", + "discord": "DISCORD_ADMINS", + "wechat": "WECHAT_ADMINS", + "slack": "SLACK_ADMINS", + "vocechat": "VOCECHAT_ADMINS", + "synologychat": "SYNOLOGYCHAT_ADMINS", + "qqbot": "QQBOT_ADMINS", + } + + user_id_key_map = { + "telegram": "TELEGRAM_CHAT_ID", + "vocechat": "VOCECHAT_CHANNEL_ID", + "wechat": "WECHAT_BOT_CHAT_ID", + } + + admin_key = admin_key_map.get(channel_type) + user_id_key = user_id_key_map.get(channel_type) + + try: + configs = ServiceConfigHelper.get_notification_configs() + for config in configs: + if config.name == self._source and config.config: + channel_admins = config.config.get(admin_key) if admin_key else None + if channel_admins: + admin_list = [ + aid.strip() + for aid in str(channel_admins).split(",") + if aid.strip() + ] + if user_id_str and user_id_str in admin_list: + return None + + user = ( + UserOper().get_by_name(self._username) + if self._username + else None + ) + if user and user.is_superuser: + return None + + return ( + "抱歉,您没有执行此工具的权限。" + "只有渠道管理员或系统管理员才能执行工具操作。" + "如需执行工具,请联系渠道管理员将您的用户ID添加到渠道管理员列表中," + "或联系系统管理员为您设置权限。" + ) + else: + user = ( + UserOper().get_by_name(self._username) + if self._username + else None + ) + if user and user.is_superuser: + return None + + if user_id_key: + config_user_id = config.config.get(user_id_key) + if config_user_id and str(config_user_id) == user_id_str: + return None + + return ( + "抱歉,您没有执行此工具的权限。" + "只有系统管理员才能执行工具操作。" + "如需执行工具,请联系系统管理员为您设置权限。" + ) + except Exception as e: + logger.error(f"检查权限失败: {e}") + + return None + async def send_tool_message(self, message: str, title: str = ""): """ 发送工具消息