mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-12 02:47:11 +08:00
feat: add detailed docstrings for methods in WechatClawBot and related modules
This commit is contained in:
@@ -31,6 +31,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]):
|
||||
|
||||
@staticmethod
|
||||
def get_name() -> str:
|
||||
"""获取模块名称。"""
|
||||
return "微信 ClawBot"
|
||||
|
||||
@staticmethod
|
||||
@@ -68,10 +69,12 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]):
|
||||
return True, ""
|
||||
|
||||
def init_setting(self) -> Tuple[str, Union[str, bool]]:
|
||||
"""初始化模块设置。"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _load_json(body: Any) -> Optional[dict]:
|
||||
"""将内容解析为 JSON 字典。"""
|
||||
if isinstance(body, dict):
|
||||
payload = body
|
||||
elif isinstance(body, bytes):
|
||||
@@ -84,6 +87,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]):
|
||||
|
||||
@staticmethod
|
||||
def _normalize_audio_refs(audio_refs: Any) -> Optional[List[str]]:
|
||||
"""标准化音频引用列表。"""
|
||||
if not audio_refs:
|
||||
return None
|
||||
if not isinstance(audio_refs, list):
|
||||
@@ -93,6 +97,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]):
|
||||
|
||||
@staticmethod
|
||||
def _normalize_files(files: Any) -> Optional[List[CommingMessage.MessageAttachment]]:
|
||||
"""标准化文件附件列表。"""
|
||||
if not files:
|
||||
return None
|
||||
if not isinstance(files, list):
|
||||
|
||||
@@ -83,12 +83,14 @@ class ILinkClient:
|
||||
account_id: Optional[str] = None,
|
||||
sync_buf: Optional[str] = None,
|
||||
) -> None:
|
||||
"""更新 iLink 登录凭证与会话状态。"""
|
||||
self.bot_token = bot_token
|
||||
self.account_id = account_id
|
||||
if sync_buf is not None:
|
||||
self.sync_buf = sync_buf
|
||||
|
||||
def _headers(self, auth_required: bool = True) -> Dict[str, str]:
|
||||
"""构建请求头,包含必要的身份验证信息。"""
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
@@ -102,10 +104,12 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _build_wechat_uin() -> str:
|
||||
"""生成一个随机的微信 UIN,用于标识请求来源。"""
|
||||
random_u32 = random.getrandbits(32)
|
||||
return base64.b64encode(str(random_u32).encode("utf-8")).decode("ascii")
|
||||
|
||||
def _with_base_info(self, body: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""向请求体中补充基础信息(如版本号)。"""
|
||||
payload = dict(body or {})
|
||||
base_info = payload.get("base_info")
|
||||
if not isinstance(base_info, dict):
|
||||
@@ -116,6 +120,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _json(resp: Any) -> Dict[str, Any]:
|
||||
"""尝试将响应解析为 JSON,处理异常情况。"""
|
||||
if not resp:
|
||||
return {}
|
||||
try:
|
||||
@@ -131,6 +136,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _short_text(value: Any, max_len: int = 240) -> str:
|
||||
"""将对象转换为简短的字符串描述,用于日志记录。"""
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, (dict, list)):
|
||||
@@ -165,6 +171,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _pick_value(obj: Dict[str, Any], keys: List[str]) -> Optional[Any]:
|
||||
"""从字典中尝试获取指定键中的第一个非空值。"""
|
||||
for key in keys:
|
||||
if key in obj and obj.get(key) not in (None, ""):
|
||||
return obj.get(key)
|
||||
@@ -187,6 +194,7 @@ class ILinkClient:
|
||||
def _find_first_value(
|
||||
cls, data: Any, keys: List[str], max_depth: int = 5
|
||||
) -> Optional[Any]:
|
||||
"""在嵌套结构中递归查找指定键中的第一个非空值。"""
|
||||
if max_depth < 0 or data is None:
|
||||
return None
|
||||
if isinstance(data, dict):
|
||||
@@ -208,6 +216,7 @@ class ILinkClient:
|
||||
def _find_first_list(
|
||||
cls, data: Any, prefer_keys: List[str], max_depth: int = 5
|
||||
) -> Optional[List[Any]]:
|
||||
"""在嵌套结构中递归查找第一个符合条件的列表。"""
|
||||
if max_depth < 0 or data is None:
|
||||
return None
|
||||
if isinstance(data, dict):
|
||||
@@ -230,6 +239,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _ok(payload: Dict[str, Any]) -> bool:
|
||||
"""检查 API 响应是否表示成功。"""
|
||||
if not payload:
|
||||
return False
|
||||
code = payload.get("errcode")
|
||||
@@ -257,6 +267,7 @@ class ILinkClient:
|
||||
return str(code).strip().lower() in {"0", "ok", "success", "succeed"}
|
||||
|
||||
def _is_send_success(self, payload: Dict[str, Any]) -> bool:
|
||||
"""检查 API 响应中是否包含明确的发送成功信号。"""
|
||||
if not payload:
|
||||
return False
|
||||
code = self._find_first_value(
|
||||
@@ -296,6 +307,7 @@ class ILinkClient:
|
||||
return False
|
||||
|
||||
def _is_send_explicit_failure(self, payload: Dict[str, Any]) -> bool:
|
||||
"""检查 API 响应中是否包含明确的发送失败信号。"""
|
||||
if not payload:
|
||||
return False
|
||||
code = self._find_first_value(
|
||||
@@ -338,6 +350,7 @@ class ILinkClient:
|
||||
return False
|
||||
|
||||
def _is_send_http_success(self, resp: Any, payload: Dict[str, Any]) -> bool:
|
||||
"""根据 HTTP 状态码和解析后的响应判断发送是否成功。"""
|
||||
if resp is None:
|
||||
return False
|
||||
status_code = getattr(resp, "status_code", None)
|
||||
@@ -355,6 +368,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _build_user_candidates(to_user: str) -> List[str]:
|
||||
"""为目标用户 ID 构建多种格式的候选列表以提高匹配成功率。"""
|
||||
raw = str(to_user or "").strip()
|
||||
if not raw:
|
||||
return []
|
||||
@@ -374,6 +388,7 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _build_text_payloads(user_id: str, text: str) -> List[Dict[str, Any]]:
|
||||
"""构建多种格式的纯文本消息报文以兼容不同接口。"""
|
||||
return [
|
||||
{"to_user": user_id, "msg_type": "text", "text": {"content": text}},
|
||||
{"to_user": user_id, "msg_type": "text", "text": text},
|
||||
@@ -385,20 +400,24 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _aes_ecb_padded_size(plaintext_size: int) -> int:
|
||||
"""计算 AES ECB 加密后的填充大小。"""
|
||||
return ((int(plaintext_size) + 1 + 15) // 16) * 16
|
||||
|
||||
@staticmethod
|
||||
def _encrypt_aes_ecb(plaintext: bytes, key: bytes) -> bytes:
|
||||
"""使用 AES ECB 模式对明文进行加密。"""
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
return cipher.encrypt(pad(plaintext, AES.block_size))
|
||||
|
||||
@staticmethod
|
||||
def _encode_media_aes_key(aeskey: bytes) -> str:
|
||||
"""将媒体 AES 密钥进行 Base64 编码。"""
|
||||
return base64.b64encode(aeskey.hex().encode("ascii")).decode("ascii")
|
||||
|
||||
def _build_protocol_text_payload(
|
||||
self, user_id: str, text: str, context_token: Optional[str]
|
||||
) -> Dict[str, Any]:
|
||||
"""构建协议层标准文本消息报文。"""
|
||||
msg = {
|
||||
"from_user_id": str(self.account_id or ""),
|
||||
"to_user_id": user_id,
|
||||
@@ -419,6 +438,7 @@ class ILinkClient:
|
||||
aeskey_b64: str,
|
||||
cipher_size: int,
|
||||
) -> Dict[str, Any]:
|
||||
"""构建协议层标准图片消息报文。"""
|
||||
msg: Dict[str, Any] = {
|
||||
"from_user_id": str(self.account_id or ""),
|
||||
"to_user_id": user_id,
|
||||
@@ -455,6 +475,7 @@ class ILinkClient:
|
||||
mime_type: str,
|
||||
file_md5: str,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""构建协议层标准文件消息报文(含多种格式候选)。"""
|
||||
media = {
|
||||
"encrypt_query_param": download_param,
|
||||
"aes_key": aeskey_b64,
|
||||
@@ -646,6 +667,7 @@ class ILinkClient:
|
||||
return False
|
||||
|
||||
def get_qrcode(self) -> Dict[str, Any]:
|
||||
"""向服务端请求并解析二维码登录信息。"""
|
||||
url = f"{self.base_url}/ilink/bot/get_bot_qrcode?bot_type=3"
|
||||
logger.debug(f"请求二维码: {url}")
|
||||
resp = RequestUtils(
|
||||
@@ -679,6 +701,7 @@ class ILinkClient:
|
||||
}
|
||||
|
||||
def get_qrcode_status(self, qrcode: str) -> Dict[str, Any]:
|
||||
"""查询二维码扫描状态并获取登录凭证。"""
|
||||
url = f"{self.base_url}/ilink/bot/get_qrcode_status"
|
||||
resp = RequestUtils(
|
||||
headers=self._headers(auth_required=False), timeout=self.timeout
|
||||
@@ -764,6 +787,7 @@ class ILinkClient:
|
||||
}
|
||||
|
||||
def send_text(self, to_user: str, text: str, context_token: Optional[str] = None) -> bool:
|
||||
"""发送纯文本消息。"""
|
||||
if not self.bot_token:
|
||||
logger.warning("发送消息失败:bot token 未配置")
|
||||
return False
|
||||
@@ -785,6 +809,7 @@ class ILinkClient:
|
||||
text: str,
|
||||
context_token: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""发送包含图片和文本的消息。"""
|
||||
if not self.bot_token:
|
||||
logger.warning("发送图文失败:bot token 未配置")
|
||||
return False
|
||||
@@ -846,6 +871,7 @@ class ILinkClient:
|
||||
def send_image_png(
|
||||
self, to_user: str, image_bytes: bytes, context_token: Optional[str] = None
|
||||
) -> bool:
|
||||
"""发送图片消息。"""
|
||||
if not self.bot_token:
|
||||
logger.warning("发送图片失败:bot token 未配置")
|
||||
return False
|
||||
@@ -890,6 +916,7 @@ class ILinkClient:
|
||||
mime_type: str,
|
||||
context_token: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""发送文件消息。"""
|
||||
if not self.bot_token:
|
||||
logger.warning("发送文件失败:bot token 未配置")
|
||||
return False
|
||||
@@ -935,6 +962,7 @@ class ILinkClient:
|
||||
|
||||
@classmethod
|
||||
def _encode_ref_payload(cls, kind: str, payload: Dict[str, Any]) -> str:
|
||||
"""将附件元信息编码为 wxclaw:// 协议链接。"""
|
||||
encoded = base64.urlsafe_b64encode(
|
||||
json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
).decode("ascii").rstrip("=")
|
||||
@@ -946,6 +974,7 @@ class ILinkClient:
|
||||
attachment: Dict[str, Any],
|
||||
default_name: Optional[str] = None,
|
||||
) -> Optional[Tuple[str, Dict[str, Any]]]:
|
||||
"""从附件信息构建 wxclaw 协议引用。"""
|
||||
if not isinstance(attachment, dict):
|
||||
return None
|
||||
download_url = (
|
||||
@@ -975,12 +1004,34 @@ class ILinkClient:
|
||||
|
||||
@staticmethod
|
||||
def _as_scalar(value: Any) -> Optional[Any]:
|
||||
"""将值转换为标量,过滤掉字典和列表。"""
|
||||
if value in (None, ""):
|
||||
return None
|
||||
if isinstance(value, (dict, list, tuple, set)):
|
||||
return None
|
||||
return value
|
||||
|
||||
def _build_poll_result(
|
||||
self,
|
||||
success: bool,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
message: Optional[str] = None,
|
||||
item_count: int = 0,
|
||||
parsed_count: int = 0,
|
||||
) -> Dict[str, Any]:
|
||||
"""构建轮询结果摘要。"""
|
||||
payload = payload or {}
|
||||
resolved_message = message or self._find_first_value(
|
||||
payload, ["errmsg", "message", "error", "error_msg", "detail"]
|
||||
)
|
||||
return {
|
||||
"success": success,
|
||||
"raw": payload,
|
||||
"message": self._short_text(resolved_message) if resolved_message else None,
|
||||
"item_count": item_count,
|
||||
"parsed_count": parsed_count,
|
||||
}
|
||||
|
||||
def _parse_incoming(self, item: Dict[str, Any]) -> Optional[ILinkIncomingMessage]:
|
||||
"""
|
||||
将 getupdates 返回的原始事件归一化为 MoviePilot 可消费的入站消息。
|
||||
@@ -1297,26 +1348,6 @@ class ILinkClient:
|
||||
return False
|
||||
return None
|
||||
|
||||
def _build_poll_result(
|
||||
self,
|
||||
success: bool,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
message: Optional[str] = None,
|
||||
item_count: int = 0,
|
||||
parsed_count: int = 0,
|
||||
) -> Dict[str, Any]:
|
||||
payload = payload or {}
|
||||
resolved_message = message or self._find_first_value(
|
||||
payload, ["errmsg", "message", "error", "error_msg", "detail"]
|
||||
)
|
||||
return {
|
||||
"success": success,
|
||||
"raw": payload,
|
||||
"message": self._short_text(resolved_message) if resolved_message else None,
|
||||
"item_count": item_count,
|
||||
"parsed_count": parsed_count,
|
||||
}
|
||||
|
||||
def poll_updates(
|
||||
self, timeout_seconds: int = 25
|
||||
) -> Tuple[List[ILinkIncomingMessage], Optional[str], Dict[str, Any]]:
|
||||
@@ -1385,6 +1416,7 @@ class ILinkClient:
|
||||
)
|
||||
|
||||
def test_connection(self) -> Tuple[bool, str]:
|
||||
"""测试与 iLink 服务端的连接连通性。"""
|
||||
if not self.bot_token:
|
||||
return False, "未登录,缺少 bot token"
|
||||
url = f"{self.base_url}/ilink/bot/getconfig"
|
||||
@@ -1406,6 +1438,7 @@ class WechatClawBot:
|
||||
|
||||
@classmethod
|
||||
def _build_cache_key(cls, config_name: str) -> str:
|
||||
"""根据配置名称构建缓存键。"""
|
||||
safe_name = hashlib.md5(str(config_name or "wechatclawbot").encode("utf-8")).hexdigest()[:12]
|
||||
return f"__wechatclawbot_state_{safe_name}__"
|
||||
|
||||
@@ -1419,6 +1452,7 @@ class WechatClawBot:
|
||||
auto_start_polling: bool = True,
|
||||
**kwargs,
|
||||
):
|
||||
"""初始化微信 ClawBot 实例及相关参数。"""
|
||||
self._config_name = name or "wechatclawbot"
|
||||
self._base_url = (WECHATCLAWBOT_BASE_URL or self._default_base_url).rstrip("/")
|
||||
self._default_target = (WECHATCLAWBOT_DEFAULT_TARGET or "").strip() or None
|
||||
@@ -1445,6 +1479,7 @@ class WechatClawBot:
|
||||
self._start_polling()
|
||||
|
||||
def _load_state(self) -> Dict[str, Any]:
|
||||
"""从文件缓存加载登录状态。"""
|
||||
content = self._filecache.get(self._cache_key)
|
||||
if not content:
|
||||
return {
|
||||
@@ -1478,6 +1513,7 @@ class WechatClawBot:
|
||||
}
|
||||
|
||||
def _save_state(self) -> None:
|
||||
"""将当前状态持久化到文件缓存。"""
|
||||
self._state["base_url"] = self._base_url
|
||||
self._filecache.set(
|
||||
self._cache_key,
|
||||
@@ -1495,11 +1531,13 @@ class WechatClawBot:
|
||||
)
|
||||
|
||||
def _update_state(self, **kwargs) -> None:
|
||||
"""更新并保存持久化状态。"""
|
||||
with self._lock:
|
||||
self._state.update(kwargs)
|
||||
self._save_state()
|
||||
|
||||
def _clear_login_state(self) -> None:
|
||||
"""清理当前的登录状态。"""
|
||||
with self._lock:
|
||||
self._state["bot_token"] = None
|
||||
self._state["account_id"] = None
|
||||
@@ -1508,6 +1546,7 @@ class WechatClawBot:
|
||||
self._save_state()
|
||||
|
||||
def _qrcode_expired(self, updated_at: Optional[int]) -> bool:
|
||||
"""检查二维码是否过期。"""
|
||||
if not updated_at:
|
||||
return True
|
||||
return int(time.time()) - int(updated_at) > self._qrcode_ttl_seconds
|
||||
@@ -1515,6 +1554,7 @@ class WechatClawBot:
|
||||
def _remember_target(
|
||||
self, user_id: str, username: Optional[str], context_token: Optional[str]
|
||||
) -> None:
|
||||
"""记录已知消息目标,用于消息发送时定位。"""
|
||||
if not user_id:
|
||||
return
|
||||
now_ts = int(time.time())
|
||||
@@ -1530,11 +1570,13 @@ class WechatClawBot:
|
||||
self._save_state()
|
||||
|
||||
def _get_context_token(self, user_id: str) -> Optional[str]:
|
||||
"""获取特定用户的上下文 Token。"""
|
||||
tokens = self._state.get("user_context_tokens") or {}
|
||||
token = tokens.get(str(user_id))
|
||||
return str(token) if token else None
|
||||
|
||||
def _get_targets(self, userid: Optional[str] = None) -> List[str]:
|
||||
"""获取消息发送的目标列表。"""
|
||||
if userid:
|
||||
return [str(userid)]
|
||||
if self._default_target:
|
||||
@@ -1551,8 +1593,26 @@ class WechatClawBot:
|
||||
return sorted(active_targets)
|
||||
return sorted(known_targets.keys())
|
||||
|
||||
@staticmethod
|
||||
def _short_text(value: Any, max_len: int = 240) -> str:
|
||||
"""将内容缩短用于日志记录。"""
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, (dict, list)):
|
||||
try:
|
||||
text = json.dumps(value, ensure_ascii=False)
|
||||
except Exception:
|
||||
text = str(value)
|
||||
else:
|
||||
text = str(value)
|
||||
text = text.replace("\n", " ").replace("\r", " ").strip()
|
||||
if len(text) > max_len:
|
||||
return f"{text[:max_len]}..."
|
||||
return text
|
||||
|
||||
@staticmethod
|
||||
def _split_content(content: str, max_bytes: int = 3000) -> List[str]:
|
||||
"""将长消息内容拆分为符合大小限制的块。"""
|
||||
if not content:
|
||||
return []
|
||||
chunks: List[str] = []
|
||||
@@ -1566,7 +1626,7 @@ class WechatClawBot:
|
||||
start = 0
|
||||
while start < len(encoded):
|
||||
end = min(start + max_bytes, len(encoded))
|
||||
while end > start and end < len(encoded) and (encoded[end] & 0xC0) == 0x80:
|
||||
while start < end < len(encoded) and (encoded[end] & 0xC0) == 0x80:
|
||||
end -= 1
|
||||
chunks.append(encoded[start:end].decode("utf-8", errors="replace").strip())
|
||||
start = end
|
||||
@@ -1585,6 +1645,7 @@ class WechatClawBot:
|
||||
text: Optional[str] = None,
|
||||
link: Optional[str] = None,
|
||||
) -> str:
|
||||
"""组合标题、文本和链接,生成消息内容。"""
|
||||
parts = []
|
||||
if title:
|
||||
parts.append(str(title).strip())
|
||||
@@ -1596,6 +1657,7 @@ class WechatClawBot:
|
||||
|
||||
@staticmethod
|
||||
def _guess_mime_type(file_path: Path, file_bytes: bytes) -> str:
|
||||
"""猜测文件 MIME 类型。"""
|
||||
guessed = mimetypes.guess_type(file_path.name)[0]
|
||||
if guessed:
|
||||
return guessed
|
||||
@@ -1609,6 +1671,7 @@ class WechatClawBot:
|
||||
|
||||
@staticmethod
|
||||
def _load_remote_image(image: str) -> Optional[bytes]:
|
||||
"""加载远程图片并返回二进制数据。"""
|
||||
image_url = str(image or "").strip()
|
||||
if not image_url:
|
||||
return None
|
||||
@@ -1637,15 +1700,18 @@ class WechatClawBot:
|
||||
return None
|
||||
|
||||
def get_state(self) -> bool:
|
||||
"""获取当前登录状态。"""
|
||||
return bool(self._state.get("bot_token"))
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止消息轮询。"""
|
||||
self._stop_event.set()
|
||||
if self._poll_thread and self._poll_thread.is_alive():
|
||||
self._poll_thread.join(timeout=5)
|
||||
self._poll_thread = None
|
||||
|
||||
def _start_polling(self) -> None:
|
||||
"""启动消息轮询线程。"""
|
||||
if not self._state.get("bot_token"):
|
||||
return
|
||||
if self._poll_thread and self._poll_thread.is_alive():
|
||||
@@ -1695,7 +1761,7 @@ class WechatClawBot:
|
||||
logger.error(
|
||||
"转发微信 ClawBot 消息失败:"
|
||||
f"message_id={message.message_id}, status={response.status_code}, "
|
||||
f"body={ILinkClient._short_text(response.text)}"
|
||||
f"body={self._short_text(response.text)}"
|
||||
)
|
||||
except Exception as err:
|
||||
logger.error(
|
||||
@@ -2025,18 +2091,25 @@ class WechatClawBot:
|
||||
if not targets:
|
||||
logger.warning("未找到可发送的微信 ClawBot 目标")
|
||||
return False
|
||||
image_bytes = self._load_remote_image(image) if image else None
|
||||
content = self._compose_text(title=title, text=text, link=link)
|
||||
# 当前 iLink 发送实现会把图文拆成两条消息发送。
|
||||
# 为避免用户看到“同一条通知被拆成文本 + 图片”两次触达,这里约定:
|
||||
# 只要通知里已经有文本内容,就优先只发文本;只有纯图片通知才发送图片。
|
||||
image_bytes = self._load_remote_image(image) if image and not content else None
|
||||
ok = False
|
||||
for target in targets:
|
||||
context_token = self._get_context_token(target)
|
||||
if image_bytes and content:
|
||||
sent = self._build_client().send_image_text_png(
|
||||
to_user=target,
|
||||
image_bytes=image_bytes,
|
||||
text=content,
|
||||
context_token=context_token,
|
||||
)
|
||||
if content:
|
||||
client = self._build_client()
|
||||
sent = True
|
||||
for chunk in self._split_content(content):
|
||||
if not client.send_text(
|
||||
to_user=target,
|
||||
text=chunk,
|
||||
context_token=context_token,
|
||||
):
|
||||
sent = False
|
||||
break
|
||||
elif image_bytes:
|
||||
sent = self._build_client().send_image_png(
|
||||
to_user=target,
|
||||
@@ -2076,36 +2149,25 @@ class WechatClawBot:
|
||||
targets = self._get_targets(userid=userid)
|
||||
if not targets:
|
||||
return False
|
||||
caption = self._compose_text(title=title, text=text)
|
||||
ok = False
|
||||
for target in targets:
|
||||
context_token = self._get_context_token(target)
|
||||
sent = True
|
||||
client = self._build_client()
|
||||
if caption:
|
||||
for chunk in self._split_content(caption):
|
||||
if not client.send_text(
|
||||
to_user=target,
|
||||
text=chunk,
|
||||
context_token=context_token,
|
||||
):
|
||||
sent = False
|
||||
break
|
||||
if sent:
|
||||
if mime_type.startswith("image/"):
|
||||
sent = self._build_client().send_image_png(
|
||||
to_user=target,
|
||||
image_bytes=file_bytes,
|
||||
context_token=context_token,
|
||||
)
|
||||
else:
|
||||
sent = self._build_client().send_file_bytes(
|
||||
to_user=target,
|
||||
file_bytes=file_bytes,
|
||||
file_name=effective_name,
|
||||
mime_type=mime_type,
|
||||
context_token=context_token,
|
||||
)
|
||||
# send_file 的主语义是发送附件本体,附加文案可以丢弃,避免出现
|
||||
# “先发一段说明,再发一个文件”的重复触达体验。
|
||||
if mime_type.startswith("image/"):
|
||||
sent = self._build_client().send_image_png(
|
||||
to_user=target,
|
||||
image_bytes=file_bytes,
|
||||
context_token=context_token,
|
||||
)
|
||||
else:
|
||||
sent = self._build_client().send_file_bytes(
|
||||
to_user=target,
|
||||
file_bytes=file_bytes,
|
||||
file_name=effective_name,
|
||||
mime_type=mime_type,
|
||||
context_token=context_token,
|
||||
)
|
||||
ok = ok or bool(sent)
|
||||
return ok
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
@@ -235,6 +236,126 @@ class WechatClawBotTest(unittest.TestCase):
|
||||
context_token=None,
|
||||
)
|
||||
|
||||
def test_wechatclawbot_send_msg_prefers_text_when_text_and_image_coexist(self):
|
||||
state = {
|
||||
"bot_token": None,
|
||||
"account_id": None,
|
||||
"sync_buf": None,
|
||||
"qrcode": {},
|
||||
"known_targets": {},
|
||||
"user_context_tokens": {},
|
||||
"base_url": "https://ilinkai.weixin.qq.com",
|
||||
}
|
||||
with patch.object(WechatClawBot, "_load_state", return_value=state):
|
||||
bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False)
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.send_text.return_value = True
|
||||
mock_client.send_image_png.return_value = True
|
||||
mock_client.send_image_text_png.return_value = True
|
||||
|
||||
with (
|
||||
patch.object(bot, "_build_client", return_value=mock_client),
|
||||
patch.object(bot, "_load_remote_image", return_value=b"image-bytes") as mock_load_image,
|
||||
):
|
||||
result = bot.send_msg(
|
||||
title="测试标题",
|
||||
text="测试正文",
|
||||
image="https://example.com/test.png",
|
||||
userid="wxid_user_1",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_load_image.assert_not_called()
|
||||
mock_client.send_text.assert_called_once_with(
|
||||
to_user="wxid_user_1",
|
||||
text="测试标题\n\n测试正文",
|
||||
context_token=None,
|
||||
)
|
||||
mock_client.send_image_png.assert_not_called()
|
||||
mock_client.send_image_text_png.assert_not_called()
|
||||
|
||||
def test_wechatclawbot_send_file_prefers_image_when_image_file_has_caption(self):
|
||||
state = {
|
||||
"bot_token": None,
|
||||
"account_id": None,
|
||||
"sync_buf": None,
|
||||
"qrcode": {},
|
||||
"known_targets": {},
|
||||
"user_context_tokens": {},
|
||||
"base_url": "https://ilinkai.weixin.qq.com",
|
||||
}
|
||||
with patch.object(WechatClawBot, "_load_state", return_value=state):
|
||||
bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False)
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.send_text.return_value = True
|
||||
mock_client.send_image_png.return_value = True
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".png") as image_file:
|
||||
image_file.write(b"\x89PNG\r\n\x1a\nfake-png")
|
||||
image_file.flush()
|
||||
with (
|
||||
patch.object(bot, "_build_client", return_value=mock_client),
|
||||
patch.object(bot, "_guess_mime_type", return_value="image/png"),
|
||||
):
|
||||
result = bot.send_file(
|
||||
file_path=image_file.name,
|
||||
title="图片标题",
|
||||
text="图片说明",
|
||||
userid="wxid_user_1",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_client.send_text.assert_not_called()
|
||||
mock_client.send_image_png.assert_called_once_with(
|
||||
to_user="wxid_user_1",
|
||||
image_bytes=b"\x89PNG\r\n\x1a\nfake-png",
|
||||
context_token=None,
|
||||
)
|
||||
|
||||
def test_wechatclawbot_send_file_prefers_file_when_generic_file_has_caption(self):
|
||||
state = {
|
||||
"bot_token": None,
|
||||
"account_id": None,
|
||||
"sync_buf": None,
|
||||
"qrcode": {},
|
||||
"known_targets": {},
|
||||
"user_context_tokens": {},
|
||||
"base_url": "https://ilinkai.weixin.qq.com",
|
||||
}
|
||||
with patch.object(WechatClawBot, "_load_state", return_value=state):
|
||||
bot = WechatClawBot(name="wechatclawbot-test", auto_start_polling=False)
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.send_text.return_value = True
|
||||
mock_client.send_file_bytes.return_value = True
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".txt") as text_file:
|
||||
text_file.write(b"plain-text")
|
||||
text_file.flush()
|
||||
with (
|
||||
patch.object(bot, "_build_client", return_value=mock_client),
|
||||
patch.object(bot, "_guess_mime_type", return_value="text/plain"),
|
||||
):
|
||||
result = bot.send_file(
|
||||
file_path=text_file.name,
|
||||
file_name="report.txt",
|
||||
title="文件标题",
|
||||
text="文件说明",
|
||||
userid="wxid_user_1",
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
mock_client.send_text.assert_not_called()
|
||||
mock_client.send_file_bytes.assert_called_once_with(
|
||||
to_user="wxid_user_1",
|
||||
file_bytes=b"plain-text",
|
||||
file_name="report.txt",
|
||||
mime_type="text/plain",
|
||||
context_token=None,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user