diff --git a/app/modules/wechatclawbot/__init__.py b/app/modules/wechatclawbot/__init__.py index a24441d8..b2daa3a6 100644 --- a/app/modules/wechatclawbot/__init__.py +++ b/app/modules/wechatclawbot/__init__.py @@ -31,6 +31,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): @staticmethod def get_name() -> str: + """获取模块名称。""" return "微信 ClawBot" @staticmethod @@ -68,10 +69,12 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): return True, "" def init_setting(self) -> Tuple[str, Union[str, bool]]: + """初始化模块设置。""" pass @staticmethod def _load_json(body: Any) -> Optional[dict]: + """将内容解析为 JSON 字典。""" if isinstance(body, dict): payload = body elif isinstance(body, bytes): @@ -84,6 +87,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): @staticmethod def _normalize_audio_refs(audio_refs: Any) -> Optional[List[str]]: + """标准化音频引用列表。""" if not audio_refs: return None if not isinstance(audio_refs, list): @@ -93,6 +97,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): @staticmethod def _normalize_files(files: Any) -> Optional[List[CommingMessage.MessageAttachment]]: + """标准化文件附件列表。""" if not files: return None if not isinstance(files, list): diff --git a/app/modules/wechatclawbot/wechatclawbot.py b/app/modules/wechatclawbot/wechatclawbot.py index d8282c06..5022487d 100644 --- a/app/modules/wechatclawbot/wechatclawbot.py +++ b/app/modules/wechatclawbot/wechatclawbot.py @@ -83,12 +83,14 @@ class ILinkClient: account_id: Optional[str] = None, sync_buf: Optional[str] = None, ) -> None: + """更新 iLink 登录凭证与会话状态。""" self.bot_token = bot_token self.account_id = account_id if sync_buf is not None: self.sync_buf = sync_buf def _headers(self, auth_required: bool = True) -> Dict[str, str]: + """构建请求头,包含必要的身份验证信息。""" headers = { "Content-Type": "application/json", "Accept": "application/json, text/plain, */*", @@ -102,10 +104,12 @@ class ILinkClient: @staticmethod def _build_wechat_uin() -> str: + """生成一个随机的微信 UIN,用于标识请求来源。""" random_u32 = random.getrandbits(32) return base64.b64encode(str(random_u32).encode("utf-8")).decode("ascii") def _with_base_info(self, body: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """向请求体中补充基础信息(如版本号)。""" payload = dict(body or {}) base_info = payload.get("base_info") if not isinstance(base_info, dict): @@ -116,6 +120,7 @@ class ILinkClient: @staticmethod def _json(resp: Any) -> Dict[str, Any]: + """尝试将响应解析为 JSON,处理异常情况。""" if not resp: return {} try: @@ -131,6 +136,7 @@ class ILinkClient: @staticmethod def _short_text(value: Any, max_len: int = 240) -> str: + """将对象转换为简短的字符串描述,用于日志记录。""" if value is None: return "" if isinstance(value, (dict, list)): @@ -165,6 +171,7 @@ class ILinkClient: @staticmethod def _pick_value(obj: Dict[str, Any], keys: List[str]) -> Optional[Any]: + """从字典中尝试获取指定键中的第一个非空值。""" for key in keys: if key in obj and obj.get(key) not in (None, ""): return obj.get(key) @@ -187,6 +194,7 @@ class ILinkClient: def _find_first_value( cls, data: Any, keys: List[str], max_depth: int = 5 ) -> Optional[Any]: + """在嵌套结构中递归查找指定键中的第一个非空值。""" if max_depth < 0 or data is None: return None if isinstance(data, dict): @@ -208,6 +216,7 @@ class ILinkClient: def _find_first_list( cls, data: Any, prefer_keys: List[str], max_depth: int = 5 ) -> Optional[List[Any]]: + """在嵌套结构中递归查找第一个符合条件的列表。""" if max_depth < 0 or data is None: return None if isinstance(data, dict): @@ -230,6 +239,7 @@ class ILinkClient: @staticmethod def _ok(payload: Dict[str, Any]) -> bool: + """检查 API 响应是否表示成功。""" if not payload: return False code = payload.get("errcode") @@ -257,6 +267,7 @@ class ILinkClient: return str(code).strip().lower() in {"0", "ok", "success", "succeed"} def _is_send_success(self, payload: Dict[str, Any]) -> bool: + """检查 API 响应中是否包含明确的发送成功信号。""" if not payload: return False code = self._find_first_value( @@ -296,6 +307,7 @@ class ILinkClient: return False def _is_send_explicit_failure(self, payload: Dict[str, Any]) -> bool: + """检查 API 响应中是否包含明确的发送失败信号。""" if not payload: return False code = self._find_first_value( @@ -338,6 +350,7 @@ class ILinkClient: return False def _is_send_http_success(self, resp: Any, payload: Dict[str, Any]) -> bool: + """根据 HTTP 状态码和解析后的响应判断发送是否成功。""" if resp is None: return False status_code = getattr(resp, "status_code", None) @@ -355,6 +368,7 @@ class ILinkClient: @staticmethod def _build_user_candidates(to_user: str) -> List[str]: + """为目标用户 ID 构建多种格式的候选列表以提高匹配成功率。""" raw = str(to_user or "").strip() if not raw: return [] @@ -374,6 +388,7 @@ class ILinkClient: @staticmethod def _build_text_payloads(user_id: str, text: str) -> List[Dict[str, Any]]: + """构建多种格式的纯文本消息报文以兼容不同接口。""" return [ {"to_user": user_id, "msg_type": "text", "text": {"content": text}}, {"to_user": user_id, "msg_type": "text", "text": text}, @@ -385,20 +400,24 @@ class ILinkClient: @staticmethod def _aes_ecb_padded_size(plaintext_size: int) -> int: + """计算 AES ECB 加密后的填充大小。""" return ((int(plaintext_size) + 1 + 15) // 16) * 16 @staticmethod def _encrypt_aes_ecb(plaintext: bytes, key: bytes) -> bytes: + """使用 AES ECB 模式对明文进行加密。""" cipher = AES.new(key, AES.MODE_ECB) return cipher.encrypt(pad(plaintext, AES.block_size)) @staticmethod def _encode_media_aes_key(aeskey: bytes) -> str: + """将媒体 AES 密钥进行 Base64 编码。""" return base64.b64encode(aeskey.hex().encode("ascii")).decode("ascii") def _build_protocol_text_payload( self, user_id: str, text: str, context_token: Optional[str] ) -> Dict[str, Any]: + """构建协议层标准文本消息报文。""" msg = { "from_user_id": str(self.account_id or ""), "to_user_id": user_id, @@ -419,6 +438,7 @@ class ILinkClient: aeskey_b64: str, cipher_size: int, ) -> Dict[str, Any]: + """构建协议层标准图片消息报文。""" msg: Dict[str, Any] = { "from_user_id": str(self.account_id or ""), "to_user_id": user_id, @@ -455,6 +475,7 @@ class ILinkClient: mime_type: str, file_md5: str, ) -> List[Dict[str, Any]]: + """构建协议层标准文件消息报文(含多种格式候选)。""" media = { "encrypt_query_param": download_param, "aes_key": aeskey_b64, @@ -646,6 +667,7 @@ class ILinkClient: return False def get_qrcode(self) -> Dict[str, Any]: + """向服务端请求并解析二维码登录信息。""" url = f"{self.base_url}/ilink/bot/get_bot_qrcode?bot_type=3" logger.debug(f"请求二维码: {url}") resp = RequestUtils( @@ -679,6 +701,7 @@ class ILinkClient: } def get_qrcode_status(self, qrcode: str) -> Dict[str, Any]: + """查询二维码扫描状态并获取登录凭证。""" url = f"{self.base_url}/ilink/bot/get_qrcode_status" resp = RequestUtils( headers=self._headers(auth_required=False), timeout=self.timeout @@ -764,6 +787,7 @@ class ILinkClient: } def send_text(self, to_user: str, text: str, context_token: Optional[str] = None) -> bool: + """发送纯文本消息。""" if not self.bot_token: logger.warning("发送消息失败:bot token 未配置") return False @@ -785,6 +809,7 @@ class ILinkClient: text: str, context_token: Optional[str] = None, ) -> bool: + """发送包含图片和文本的消息。""" if not self.bot_token: logger.warning("发送图文失败:bot token 未配置") return False @@ -846,6 +871,7 @@ class ILinkClient: def send_image_png( self, to_user: str, image_bytes: bytes, context_token: Optional[str] = None ) -> bool: + """发送图片消息。""" if not self.bot_token: logger.warning("发送图片失败:bot token 未配置") return False @@ -890,6 +916,7 @@ class ILinkClient: mime_type: str, context_token: Optional[str] = None, ) -> bool: + """发送文件消息。""" if not self.bot_token: logger.warning("发送文件失败:bot token 未配置") return False @@ -935,6 +962,7 @@ class ILinkClient: @classmethod def _encode_ref_payload(cls, kind: str, payload: Dict[str, Any]) -> str: + """将附件元信息编码为 wxclaw:// 协议链接。""" encoded = base64.urlsafe_b64encode( json.dumps(payload, ensure_ascii=False).encode("utf-8") ).decode("ascii").rstrip("=") @@ -946,6 +974,7 @@ class ILinkClient: attachment: Dict[str, Any], default_name: Optional[str] = None, ) -> Optional[Tuple[str, Dict[str, Any]]]: + """从附件信息构建 wxclaw 协议引用。""" if not isinstance(attachment, dict): return None download_url = ( @@ -975,12 +1004,34 @@ class ILinkClient: @staticmethod def _as_scalar(value: Any) -> Optional[Any]: + """将值转换为标量,过滤掉字典和列表。""" if value in (None, ""): return None if isinstance(value, (dict, list, tuple, set)): return None return value + def _build_poll_result( + self, + success: bool, + payload: Optional[Dict[str, Any]] = None, + message: Optional[str] = None, + item_count: int = 0, + parsed_count: int = 0, + ) -> Dict[str, Any]: + """构建轮询结果摘要。""" + payload = payload or {} + resolved_message = message or self._find_first_value( + payload, ["errmsg", "message", "error", "error_msg", "detail"] + ) + return { + "success": success, + "raw": payload, + "message": self._short_text(resolved_message) if resolved_message else None, + "item_count": item_count, + "parsed_count": parsed_count, + } + def _parse_incoming(self, item: Dict[str, Any]) -> Optional[ILinkIncomingMessage]: """ 将 getupdates 返回的原始事件归一化为 MoviePilot 可消费的入站消息。 @@ -1297,26 +1348,6 @@ class ILinkClient: return False return None - def _build_poll_result( - self, - success: bool, - payload: Optional[Dict[str, Any]] = None, - message: Optional[str] = None, - item_count: int = 0, - parsed_count: int = 0, - ) -> Dict[str, Any]: - payload = payload or {} - resolved_message = message or self._find_first_value( - payload, ["errmsg", "message", "error", "error_msg", "detail"] - ) - return { - "success": success, - "raw": payload, - "message": self._short_text(resolved_message) if resolved_message else None, - "item_count": item_count, - "parsed_count": parsed_count, - } - def poll_updates( self, timeout_seconds: int = 25 ) -> Tuple[List[ILinkIncomingMessage], Optional[str], Dict[str, Any]]: @@ -1385,6 +1416,7 @@ class ILinkClient: ) def test_connection(self) -> Tuple[bool, str]: + """测试与 iLink 服务端的连接连通性。""" if not self.bot_token: return False, "未登录,缺少 bot token" url = f"{self.base_url}/ilink/bot/getconfig" @@ -1406,6 +1438,7 @@ class WechatClawBot: @classmethod def _build_cache_key(cls, config_name: str) -> str: + """根据配置名称构建缓存键。""" safe_name = hashlib.md5(str(config_name or "wechatclawbot").encode("utf-8")).hexdigest()[:12] return f"__wechatclawbot_state_{safe_name}__" @@ -1419,6 +1452,7 @@ class WechatClawBot: auto_start_polling: bool = True, **kwargs, ): + """初始化微信 ClawBot 实例及相关参数。""" self._config_name = name or "wechatclawbot" self._base_url = (WECHATCLAWBOT_BASE_URL or self._default_base_url).rstrip("/") self._default_target = (WECHATCLAWBOT_DEFAULT_TARGET or "").strip() or None @@ -1445,6 +1479,7 @@ class WechatClawBot: self._start_polling() def _load_state(self) -> Dict[str, Any]: + """从文件缓存加载登录状态。""" content = self._filecache.get(self._cache_key) if not content: return { @@ -1478,6 +1513,7 @@ class WechatClawBot: } def _save_state(self) -> None: + """将当前状态持久化到文件缓存。""" self._state["base_url"] = self._base_url self._filecache.set( self._cache_key, @@ -1495,11 +1531,13 @@ class WechatClawBot: ) def _update_state(self, **kwargs) -> None: + """更新并保存持久化状态。""" with self._lock: self._state.update(kwargs) self._save_state() def _clear_login_state(self) -> None: + """清理当前的登录状态。""" with self._lock: self._state["bot_token"] = None self._state["account_id"] = None @@ -1508,6 +1546,7 @@ class WechatClawBot: self._save_state() def _qrcode_expired(self, updated_at: Optional[int]) -> bool: + """检查二维码是否过期。""" if not updated_at: return True return int(time.time()) - int(updated_at) > self._qrcode_ttl_seconds @@ -1515,6 +1554,7 @@ class WechatClawBot: def _remember_target( self, user_id: str, username: Optional[str], context_token: Optional[str] ) -> None: + """记录已知消息目标,用于消息发送时定位。""" if not user_id: return now_ts = int(time.time()) @@ -1530,11 +1570,13 @@ class WechatClawBot: self._save_state() def _get_context_token(self, user_id: str) -> Optional[str]: + """获取特定用户的上下文 Token。""" tokens = self._state.get("user_context_tokens") or {} token = tokens.get(str(user_id)) return str(token) if token else None def _get_targets(self, userid: Optional[str] = None) -> List[str]: + """获取消息发送的目标列表。""" if userid: return [str(userid)] if self._default_target: @@ -1551,8 +1593,26 @@ class WechatClawBot: return sorted(active_targets) return sorted(known_targets.keys()) + @staticmethod + def _short_text(value: Any, max_len: int = 240) -> str: + """将内容缩短用于日志记录。""" + if value is None: + return "" + if isinstance(value, (dict, list)): + try: + text = json.dumps(value, ensure_ascii=False) + except Exception: + text = str(value) + else: + text = str(value) + text = text.replace("\n", " ").replace("\r", " ").strip() + if len(text) > max_len: + return f"{text[:max_len]}..." + return text + @staticmethod def _split_content(content: str, max_bytes: int = 3000) -> List[str]: + """将长消息内容拆分为符合大小限制的块。""" if not content: return [] chunks: List[str] = [] @@ -1566,7 +1626,7 @@ class WechatClawBot: start = 0 while start < len(encoded): end = min(start + max_bytes, len(encoded)) - while end > start and end < len(encoded) and (encoded[end] & 0xC0) == 0x80: + while start < end < len(encoded) and (encoded[end] & 0xC0) == 0x80: end -= 1 chunks.append(encoded[start:end].decode("utf-8", errors="replace").strip()) start = end @@ -1585,6 +1645,7 @@ class WechatClawBot: text: Optional[str] = None, link: Optional[str] = None, ) -> str: + """组合标题、文本和链接,生成消息内容。""" parts = [] if title: parts.append(str(title).strip()) @@ -1596,6 +1657,7 @@ class WechatClawBot: @staticmethod def _guess_mime_type(file_path: Path, file_bytes: bytes) -> str: + """猜测文件 MIME 类型。""" guessed = mimetypes.guess_type(file_path.name)[0] if guessed: return guessed @@ -1609,6 +1671,7 @@ class WechatClawBot: @staticmethod def _load_remote_image(image: str) -> Optional[bytes]: + """加载远程图片并返回二进制数据。""" image_url = str(image or "").strip() if not image_url: return None @@ -1637,15 +1700,18 @@ class WechatClawBot: return None def get_state(self) -> bool: + """获取当前登录状态。""" return bool(self._state.get("bot_token")) def stop(self) -> None: + """停止消息轮询。""" self._stop_event.set() if self._poll_thread and self._poll_thread.is_alive(): self._poll_thread.join(timeout=5) self._poll_thread = None def _start_polling(self) -> None: + """启动消息轮询线程。""" if not self._state.get("bot_token"): return if self._poll_thread and self._poll_thread.is_alive(): @@ -1695,7 +1761,7 @@ class WechatClawBot: logger.error( "转发微信 ClawBot 消息失败:" f"message_id={message.message_id}, status={response.status_code}, " - f"body={ILinkClient._short_text(response.text)}" + f"body={self._short_text(response.text)}" ) except Exception as err: logger.error( @@ -2025,18 +2091,25 @@ class WechatClawBot: if not targets: logger.warning("未找到可发送的微信 ClawBot 目标") return False - image_bytes = self._load_remote_image(image) if image else None content = self._compose_text(title=title, text=text, link=link) + # 当前 iLink 发送实现会把图文拆成两条消息发送。 + # 为避免用户看到“同一条通知被拆成文本 + 图片”两次触达,这里约定: + # 只要通知里已经有文本内容,就优先只发文本;只有纯图片通知才发送图片。 + image_bytes = self._load_remote_image(image) if image and not content else None ok = False for target in targets: context_token = self._get_context_token(target) - if image_bytes and content: - sent = self._build_client().send_image_text_png( - to_user=target, - image_bytes=image_bytes, - text=content, - context_token=context_token, - ) + if content: + client = self._build_client() + sent = True + for chunk in self._split_content(content): + if not client.send_text( + to_user=target, + text=chunk, + context_token=context_token, + ): + sent = False + break elif image_bytes: sent = self._build_client().send_image_png( to_user=target, @@ -2076,36 +2149,25 @@ class WechatClawBot: targets = self._get_targets(userid=userid) if not targets: return False - caption = self._compose_text(title=title, text=text) ok = False for target in targets: context_token = self._get_context_token(target) - sent = True - client = self._build_client() - if caption: - for chunk in self._split_content(caption): - if not client.send_text( - to_user=target, - text=chunk, - context_token=context_token, - ): - sent = False - break - if sent: - if mime_type.startswith("image/"): - sent = self._build_client().send_image_png( - to_user=target, - image_bytes=file_bytes, - context_token=context_token, - ) - else: - sent = self._build_client().send_file_bytes( - to_user=target, - file_bytes=file_bytes, - file_name=effective_name, - mime_type=mime_type, - context_token=context_token, - ) + # send_file 的主语义是发送附件本体,附加文案可以丢弃,避免出现 + # “先发一段说明,再发一个文件”的重复触达体验。 + if mime_type.startswith("image/"): + sent = self._build_client().send_image_png( + to_user=target, + image_bytes=file_bytes, + context_token=context_token, + ) + else: + sent = self._build_client().send_file_bytes( + to_user=target, + file_bytes=file_bytes, + file_name=effective_name, + mime_type=mime_type, + context_token=context_token, + ) ok = ok or bool(sent) return ok diff --git a/tests/test_wechatclawbot.py b/tests/test_wechatclawbot.py index 599fc369..416c75f5 100644 --- a/tests/test_wechatclawbot.py +++ b/tests/test_wechatclawbot.py @@ -1,4 +1,5 @@ import json +import tempfile import unittest from types import SimpleNamespace from unittest.mock import MagicMock, patch @@ -235,6 +236,126 @@ class WechatClawBotTest(unittest.TestCase): context_token=None, ) + def test_wechatclawbot_send_msg_prefers_text_when_text_and_image_coexist(self): + state = { + "bot_token": None, + "account_id": None, + "sync_buf": None, + "qrcode": {}, + "known_targets": {}, + "user_context_tokens": {}, + "base_url": "https://ilinkai.weixin.qq.com", + } + with patch.object(WechatClawBot, "_load_state", return_value=state): + bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False) + + mock_client = MagicMock() + mock_client.send_text.return_value = True + mock_client.send_image_png.return_value = True + mock_client.send_image_text_png.return_value = True + + with ( + patch.object(bot, "_build_client", return_value=mock_client), + patch.object(bot, "_load_remote_image", return_value=b"image-bytes") as mock_load_image, + ): + result = bot.send_msg( + title="测试标题", + text="测试正文", + image="https://example.com/test.png", + userid="wxid_user_1", + ) + + self.assertTrue(result) + mock_load_image.assert_not_called() + mock_client.send_text.assert_called_once_with( + to_user="wxid_user_1", + text="测试标题\n\n测试正文", + context_token=None, + ) + mock_client.send_image_png.assert_not_called() + mock_client.send_image_text_png.assert_not_called() + + def test_wechatclawbot_send_file_prefers_image_when_image_file_has_caption(self): + state = { + "bot_token": None, + "account_id": None, + "sync_buf": None, + "qrcode": {}, + "known_targets": {}, + "user_context_tokens": {}, + "base_url": "https://ilinkai.weixin.qq.com", + } + with patch.object(WechatClawBot, "_load_state", return_value=state): + bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False) + + mock_client = MagicMock() + mock_client.send_text.return_value = True + mock_client.send_image_png.return_value = True + + with tempfile.NamedTemporaryFile(suffix=".png") as image_file: + image_file.write(b"\x89PNG\r\n\x1a\nfake-png") + image_file.flush() + with ( + patch.object(bot, "_build_client", return_value=mock_client), + patch.object(bot, "_guess_mime_type", return_value="image/png"), + ): + result = bot.send_file( + file_path=image_file.name, + title="图片标题", + text="图片说明", + userid="wxid_user_1", + ) + + self.assertTrue(result) + mock_client.send_text.assert_not_called() + mock_client.send_image_png.assert_called_once_with( + to_user="wxid_user_1", + image_bytes=b"\x89PNG\r\n\x1a\nfake-png", + context_token=None, + ) + + def test_wechatclawbot_send_file_prefers_file_when_generic_file_has_caption(self): + state = { + "bot_token": None, + "account_id": None, + "sync_buf": None, + "qrcode": {}, + "known_targets": {}, + "user_context_tokens": {}, + "base_url": "https://ilinkai.weixin.qq.com", + } + with patch.object(WechatClawBot, "_load_state", return_value=state): + bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False) + + mock_client = MagicMock() + mock_client.send_text.return_value = True + mock_client.send_file_bytes.return_value = True + + with tempfile.NamedTemporaryFile(suffix=".txt") as text_file: + text_file.write(b"plain-text") + text_file.flush() + with ( + patch.object(bot, "_build_client", return_value=mock_client), + patch.object(bot, "_guess_mime_type", return_value="text/plain"), + ): + result = bot.send_file( + file_path=text_file.name, + file_name="report.txt", + title="文件标题", + text="文件说明", + userid="wxid_user_1", + ) + + self.assertTrue(result) + mock_client.send_text.assert_not_called() + mock_client.send_file_bytes.assert_called_once_with( + to_user="wxid_user_1", + file_bytes=b"plain-text", + file_name="report.txt", + mime_type="text/plain", + context_token=None, + ) + if __name__ == "__main__": unittest.main()