From 510476c2147882de10119bb04d94327525795864 Mon Sep 17 00:00:00 2001 From: EkkoG Date: Sat, 14 Mar 2026 16:17:39 +0800 Subject: [PATCH] feat(wechat): add WeChatBot class for intelligent bot integration and enhance WechatModule to support bot mode - Introduced WeChatBot class for handling intelligent bot functionalities. - Updated WechatModule to differentiate between traditional and bot modes using WECHAT_MODE configuration. - Enhanced stop method in WechatModule to gracefully stop client instances. - Added logic to skip traditional menu initialization for bot mode. - Updated .gitignore to include .venv directory. --- app/api/endpoints/message.py | 5 +- app/modules/wechat/__init__.py | 26 +- app/modules/wechat/wechatbot.py | 503 ++++++++++++++++++++++++++++++++ 3 files changed, 531 insertions(+), 3 deletions(-) create mode 100644 app/modules/wechat/wechatbot.py diff --git a/app/api/endpoints/message.py b/app/api/endpoints/message.py index 9ed9d5e5..2cc4a2b5 100644 --- a/app/api/endpoints/message.py +++ b/app/api/endpoints/message.py @@ -86,7 +86,10 @@ def wechat_verify(echostr: str, msg_signature: str, timestamp: Union[str, int], if not client_configs: return "未找到对应的消息配置" client_config = next((config for config in client_configs if - config.type == "wechat" and config.enabled and (not source or config.name == source)), None) + config.type == "wechat" + and config.enabled + and config.config.get("WECHAT_MODE", "app") != "bot" + and (not source or config.name == source)), None) if not client_config: return "未找到对应的消息配置" try: diff --git a/app/modules/wechat/__init__.py b/app/modules/wechat/__init__.py index 2ff8a47d..81a54fd2 100644 --- a/app/modules/wechat/__init__.py +++ b/app/modules/wechat/__init__.py @@ -8,6 +8,7 @@ from app.log import logger from app.modules import _ModuleBase, _MessageBase from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt from app.modules.wechat.wechat import WeChat +from app.modules.wechat.wechatbot import WeChatBot from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData from app.schemas.types import ModuleType, ChainEventType from app.utils.dom import DomUtils @@ -20,8 +21,9 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): """ 初始化模块 """ + self.stop() super().init_service(service_name=WeChat.__name__.lower(), - service_type=WeChat) + service_type=self._create_client) self._channel = MessageChannel.Wechat @staticmethod @@ -50,7 +52,22 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): return 1 def stop(self): - pass + for client in self.get_instances().values(): + if hasattr(client, "stop"): + try: + client.stop() + except Exception as err: + logger.error(f"停止微信模块实例失败:{err}") + + @staticmethod + def _is_bot_mode(config: dict) -> bool: + return (config or {}).get("WECHAT_MODE", "app") == "bot" + + @classmethod + def _create_client(cls, conf): + if cls._is_bot_mode(conf.config): + return WeChatBot(name=conf.name, **conf.config) + return WeChat(name=conf.name, **conf.config) def test(self) -> Optional[Tuple[bool, str]]: """ @@ -85,6 +102,8 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): client_config = self.get_config(source) if not client_config: return None + if self._is_bot_mode(client_config.config): + return None client: WeChat = self.get_instance(client_config.name) # URL参数 sVerifyMsgSig = args.get("msg_signature") @@ -229,6 +248,9 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]): :param commands: 命令字典 """ for client_config in self.get_configs().values(): + if self._is_bot_mode(client_config.config): + logger.debug(f"{client_config.name} 为智能机器人模式,跳过传统菜单初始化") + continue # 如果没有配置消息解密相关参数,则也没有必要进行菜单初始化 if not client_config.config.get("WECHAT_ENCODING_AESKEY") or not client_config.config.get("WECHAT_TOKEN"): logger.debug(f"{client_config.name} 缺少消息解密参数,跳过后续菜单初始化") diff --git a/app/modules/wechat/wechatbot.py b/app/modules/wechat/wechatbot.py new file mode 100644 index 00000000..65250605 --- /dev/null +++ b/app/modules/wechat/wechatbot.py @@ -0,0 +1,503 @@ +import hashlib +import json +import pickle +import re +import threading +import time +import uuid +from typing import Optional, List, Dict, Tuple, Set + +import websocket + +from app.chain.message import MessageChain +from app.core.cache import FileCache +from app.core.context import MediaInfo, Context +from app.core.metainfo import MetaInfo +from app.log import logger +from app.schemas.types import MessageChannel +from app.utils.string import StringUtils + + +class WeChatBot: + """ + 企业微信智能机器人(长连接模式) + 固定使用: + - dmPolicy = open + - groupPolicy = disabled + """ + + _default_ws_url = "wss://openws.work.weixin.qq.com" + _heartbeat_interval = 30 + _ack_timeout = 10 + + def __init__(self, + WECHAT_BOT_ID: Optional[str] = None, + WECHAT_BOT_SECRET: Optional[str] = None, + WECHAT_BOT_CHAT_ID: Optional[str] = None, + WECHAT_BOT_WS_URL: Optional[str] = None, + WECHAT_ADMINS: Optional[str] = None, + name: Optional[str] = None, + **kwargs): + self._config_name = name or "wechat" + self._bot_id = WECHAT_BOT_ID + self._bot_secret = WECHAT_BOT_SECRET + self._default_chat_id = WECHAT_BOT_CHAT_ID.strip() if WECHAT_BOT_CHAT_ID else None + self._ws_url = WECHAT_BOT_WS_URL or self._default_ws_url + self._admins = [item.strip() for item in (WECHAT_ADMINS or "").split(",") if item.strip()] + safe_name = hashlib.md5(self._config_name.encode()).hexdigest()[:12] + self._cache_key = f"__wechatbot_known_targets_{safe_name}__" + self._filecache = FileCache() + self._known_targets: Set[str] = set() + + self._ready = False + self._ws_app: Optional[websocket.WebSocketApp] = None + self._ws_thread: Optional[threading.Thread] = None + self._heartbeat_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._authenticated = threading.Event() + self._send_lock = threading.Lock() + self._acks_lock = threading.Lock() + self._pending_acks: Dict[str, dict] = {} + + if not self._bot_id or not self._bot_secret: + logger.error("企业微信智能机器人配置不完整!") + return + + self._load_known_targets() + self._ready = True + self._start_gateway() + + @staticmethod + def _build_req_id(prefix: str) -> str: + return f"{prefix}_{uuid.uuid4().hex}" + + @staticmethod + def _split_content(content: str, max_bytes: int = 4000) -> List[str]: + """ + 将 markdown 内容拆分为较小分块,避免消息过长发送失败 + """ + if not content: + return [] + + chunks = [] + current = bytearray() + for line in content.splitlines(): + encoded = (line + "\n").encode("utf-8") + if len(encoded) > max_bytes: + if current: + chunks.append(current.decode("utf-8", errors="replace").strip()) + current = bytearray() + start = 0 + while start < len(encoded): + end = min(start + max_bytes, len(encoded)) + while end > start and end < len(encoded) and (encoded[end] & 0xC0) == 0x80: + end -= 1 + chunks.append(encoded[start:end].decode("utf-8", errors="replace").strip()) + start = end + continue + + if len(current) + len(encoded) > max_bytes: + chunks.append(current.decode("utf-8", errors="replace").strip()) + current = bytearray() + current += encoded + + if current: + chunks.append(current.decode("utf-8", errors="replace").strip()) + + return [chunk for chunk in chunks if chunk] + + def _start_gateway(self) -> None: + if self._ws_thread and self._ws_thread.is_alive(): + return + + self._stop_event.clear() + self._ws_thread = threading.Thread(target=self._run_gateway, daemon=True) + self._ws_thread.start() + self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True) + self._heartbeat_thread.start() + logger.info(f"企业微信智能机器人长连接已启动:{self._config_name}") + + def stop(self) -> None: + self._stop_event.set() + self._authenticated.clear() + if self._ws_app: + try: + self._ws_app.close() + except Exception as err: + logger.debug(f"关闭企业微信智能机器人连接失败:{err}") + if self._ws_thread and self._ws_thread.is_alive(): + self._ws_thread.join(timeout=5) + if self._heartbeat_thread and self._heartbeat_thread.is_alive(): + self._heartbeat_thread.join(timeout=2) + + def get_state(self) -> bool: + return self._ready and self._authenticated.is_set() + + def _load_known_targets(self) -> None: + try: + content = self._filecache.get(self._cache_key) + if not content: + return + data = pickle.loads(content) + if isinstance(data, (list, set, tuple)): + self._known_targets = {str(item).strip() for item in data if str(item).strip()} + except Exception as err: + logger.debug(f"加载企业微信智能机器人已互动用户失败:{err}") + + def _save_known_targets(self) -> None: + try: + self._filecache.set(self._cache_key, pickle.dumps(sorted(self._known_targets))) + except Exception as err: + logger.debug(f"保存企业微信智能机器人已互动用户失败:{err}") + + def _remember_target(self, userid: Optional[str]) -> None: + target = str(userid).strip() if userid else None + if not target: + return + if target not in self._known_targets: + self._known_targets.add(target) + self._save_known_targets() + + def _run_gateway(self) -> None: + reconnect_delays = [1, 2, 5, 10, 30, 60] + attempt = 0 + + while not self._stop_event.is_set(): + self._authenticated.clear() + try: + self._ws_app = websocket.WebSocketApp( + self._ws_url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close, + ) + self._ws_app.run_forever( + ping_interval=None, + ping_timeout=None, + skip_utf8_validation=True, + ) + except Exception as err: + logger.error(f"企业微信智能机器人连接异常:{err}") + + if self._stop_event.is_set(): + break + + delay = reconnect_delays[min(attempt, len(reconnect_delays) - 1)] + attempt += 1 + logger.info(f"企业微信智能机器人将在 {delay}s 后重连:{self._config_name}") + for _ in range(delay * 10): + if self._stop_event.is_set(): + break + time.sleep(0.1) + + def _heartbeat_loop(self) -> None: + while not self._stop_event.is_set(): + if self._authenticated.is_set(): + try: + self._send_raw({ + "cmd": "ping", + "headers": {"req_id": self._build_req_id("ping")}, + }) + except Exception as err: + logger.debug(f"发送企业微信智能机器人心跳失败:{err}") + for _ in range(self._heartbeat_interval * 10): + if self._stop_event.is_set(): + return + time.sleep(0.1) + + def _on_open(self, ws) -> None: + logger.info(f"企业微信智能机器人连接成功,开始订阅:{self._config_name}") + self._send_raw({ + "cmd": "aibot_subscribe", + "headers": {"req_id": self._build_req_id("aibot_subscribe")}, + "body": { + "bot_id": self._bot_id, + "secret": self._bot_secret, + }, + }) + + def _on_message(self, ws, message: str) -> None: + try: + payload = json.loads(message) + except Exception as err: + logger.error(f"解析企业微信智能机器人消息失败:{err}") + return + + req_id = (payload.get("headers") or {}).get("req_id") + if req_id: + self._resolve_ack(req_id, payload) + + cmd = payload.get("cmd") + if not cmd: + if str(req_id).startswith("aibot_subscribe"): + if payload.get("errcode") == 0: + self._authenticated.set() + logger.info(f"企业微信智能机器人订阅成功:{self._config_name}") + else: + logger.error( + f"企业微信智能机器人订阅失败:{payload.get('errmsg')} ({payload.get('errcode')})" + ) + self._authenticated.clear() + return + + if cmd == "aibot_msg_callback": + self._handle_callback_message(payload) + elif cmd == "aibot_event_callback": + self._handle_callback_event(payload) + + def _on_error(self, ws, error) -> None: + self._authenticated.clear() + logger.error(f"企业微信智能机器人 WebSocket 错误:{error}") + + def _on_close(self, ws, close_status_code, close_msg) -> None: + self._authenticated.clear() + logger.info(f"企业微信智能机器人连接关闭:{close_status_code} {close_msg}") + + def _resolve_ack(self, req_id: str, payload: dict) -> None: + with self._acks_lock: + pending = self._pending_acks.get(req_id) + if not pending: + return + pending["payload"] = payload + pending["event"].set() + + def _send_raw(self, payload: dict) -> None: + if not self._ws_app or not self._ws_app.sock or not self._ws_app.sock.connected: + raise RuntimeError("企业微信智能机器人未连接") + self._ws_app.send(json.dumps(payload, ensure_ascii=False)) + + def _send_with_ack(self, payload: dict) -> bool: + req_id = (payload.get("headers") or {}).get("req_id") + if not req_id: + return False + + if not self._authenticated.wait(timeout=self._ack_timeout): + logger.error("企业微信智能机器人未完成认证,无法发送消息") + return False + + pending = {"event": threading.Event(), "payload": None} + with self._acks_lock: + self._pending_acks[req_id] = pending + + try: + with self._send_lock: + self._send_raw(payload) + if not pending["event"].wait(timeout=self._ack_timeout): + logger.error(f"企业微信智能机器人消息发送超时:req_id={req_id}") + return False + ack = pending["payload"] or {} + if ack.get("errcode") != 0: + logger.error( + f"企业微信智能机器人消息发送失败:{ack.get('errmsg')} ({ack.get('errcode')})" + ) + return False + return True + finally: + with self._acks_lock: + self._pending_acks.pop(req_id, None) + + def _handle_callback_event(self, payload: dict) -> None: + event = ((payload.get("body") or {}).get("event") or {}).get("eventtype") + if event == "disconnected_event": + logger.info(f"企业微信智能机器人旧连接被踢下线:{self._config_name}") + + @staticmethod + def _extract_text_from_body(body: dict) -> Optional[str]: + msgtype = body.get("msgtype") + text_parts = [] + + if msgtype == "text": + text = ((body.get("text") or {}).get("content") or "").strip() + if text: + text_parts.append(text) + elif msgtype == "voice": + text = ((body.get("voice") or {}).get("content") or "").strip() + if text: + text_parts.append(text) + elif msgtype == "mixed": + for item in (body.get("mixed") or {}).get("msg_item") or []: + if item.get("msgtype") == "text": + content = ((item.get("text") or {}).get("content") or "").strip() + if content: + text_parts.append(content) + + quote = body.get("quote") or {} + if not text_parts and quote.get("msgtype") == "text": + quote_text = ((quote.get("text") or {}).get("content") or "").strip() + if quote_text: + text_parts.append(quote_text) + + text = "\n".join(part for part in text_parts if part).strip() + return text or None + + def _handle_callback_message(self, payload: dict) -> None: + body = payload.get("body") or {} + sender = ((body.get("from") or {}).get("userid") or "").strip() + if not sender: + return + + if body.get("chattype") == "group": + logger.debug(f"企业微信智能机器人忽略群聊消息(groupPolicy=disabled):{self._config_name}") + return + + text = self._extract_text_from_body(body) + if not text: + return + + text = re.sub(r"@\S+", "", text).strip() + if not text: + return + + self._remember_target(sender) + + if text.startswith("/") and self._admins and sender not in self._admins: + self.send_msg(title="只有管理员才有权限执行此命令", userid=sender) + return + + logger.info(f"收到来自 {self._config_name} 的企业微信智能机器人消息:userid={sender}, text={text}") + self._forward_to_message_chain(userid=sender, text=text) + + def _forward_to_message_chain(self, userid: str, text: str) -> None: + def _run(): + try: + MessageChain().handle_message( + channel=MessageChannel.Wechat, + source=self._config_name, + userid=userid, + username=userid, + text=text, + ) + except Exception as err: + logger.error(f"企业微信智能机器人转发消息失败:{err}") + + threading.Thread(target=_run, daemon=True).start() + + @staticmethod + def _normalize_target(userid: Optional[str], default_chat_id: Optional[str]) -> Tuple[Optional[str], int]: + target = str(userid).strip() if userid else (default_chat_id.strip() if default_chat_id else None) + if not target: + return None, 1 + + lowered = target.lower() + if lowered.startswith("group:"): + return target[6:].strip(), 2 + if lowered.startswith("user:"): + return target[5:].strip(), 1 + return target, 1 + + @staticmethod + def _build_markdown(title: Optional[str] = None, + text: Optional[str] = None, + image: Optional[str] = None, + link: Optional[str] = None) -> str: + parts = [] + if title: + parts.append(f"**{title}**") + if text: + parts.append(text.replace("\n\n", "\n")) + if image: + parts.append(f"![]({image})") + if link: + parts.append(f"[点击查看]({link})") + return "\n\n".join(part for part in parts if part).strip() + + def _resolve_targets(self, userid: Optional[str] = None) -> List[Tuple[str, int]]: + target, chat_type = self._normalize_target(userid=userid, default_chat_id=self._default_chat_id) + if target: + return [(target, chat_type)] + return [(known_userid, 1) for known_userid in sorted(self._known_targets)] + + def _send_markdown(self, content: str, userid: Optional[str] = None) -> Optional[bool]: + if not content: + return False + + targets = self._resolve_targets(userid=userid) + if not targets: + logger.warning(f"{self._config_name} 未配置默认发送目标,且暂无已互动用户") + return False + + send_success = False + for target, chat_type in targets: + target_success = True + for chunk in self._split_content(content): + req_id = self._build_req_id("aibot_send_msg") + payload = { + "cmd": "aibot_send_msg", + "headers": {"req_id": req_id}, + "body": { + "chatid": target, + "chat_type": chat_type, + "msgtype": "markdown", + "markdown": { + "content": chunk + } + } + } + if not self._send_with_ack(payload): + target_success = False + logger.warning(f"{self._config_name} 向目标 {target} 发送通知失败") + break + send_success = send_success or target_success + return send_success + + def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None, + userid: Optional[str] = None, link: Optional[str] = None) -> Optional[bool]: + content = self._build_markdown(title=title, text=text, image=image, link=link) + return self._send_markdown(content=content, userid=userid) + + def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None) -> Optional[bool]: + if not medias: + return False + + lines = ["**媒体列表**"] + for index, media in enumerate(medias, start=1): + line = f"{index}. {media.title_year}" + if media.vote_average: + line += f" 评分:{media.vote_average}" + if media.detail_link: + line += f"\n{media.detail_link}" + lines.append(line) + return self._send_markdown(content="\n\n".join(lines), userid=userid) + + def send_torrents_msg(self, torrents: List[Context], + userid: Optional[str] = None, title: Optional[str] = None, + link: Optional[str] = None) -> Optional[bool]: + if not torrents: + return False + + lines = [f"**{title or '种子列表'}**"] + if link: + lines.append(link) + + for index, context in enumerate(torrents, start=1): + torrent = context.torrent_info + meta = MetaInfo(title=torrent.title, subtitle=torrent.description) + torrent_title = ( + f"{index}.【{torrent.site_name}】" + f"{meta.season_episode} " + f"{meta.resource_term} " + f"{meta.video_term} " + f"{meta.release_group} " + f"{StringUtils.str_filesize(torrent.size)} " + f"{torrent.volume_factor} " + f"{torrent.seeders}↑" + ) + torrent_title = re.sub(r"\s+", " ", torrent_title).strip() + if torrent.page_url: + torrent_title += f"\n{torrent.page_url}" + lines.append(torrent_title) + + return self._send_markdown(content="\n\n".join(lines), userid=userid) + + def create_menus(self, commands: Dict[str, dict]): + """ + 智能机器人模式不支持传统自建应用菜单 + """ + return + + def delete_menus(self): + """ + 智能机器人模式不支持传统自建应用菜单 + """ + return