feat: enhance iLink polling logic to support multiple payload formats and improve success determination

This commit is contained in:
jxxghp
2026-05-11 08:02:17 +08:00
parent 3776422634
commit 3c9228c2f8
2 changed files with 54 additions and 11 deletions

View File

@@ -1238,23 +1238,32 @@ class ILinkClient:
def _extract_updates(
self, payload: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""按官方 getupdates 协议提取顶层 msgs 与 get_updates_buf。"""
sync_buf = self._pick_present_value(payload, ["get_updates_buf"])
"""
提取轮询结果中的消息列表与游标。
线上存在两种等价字段命名:较新的实现返回 `get_updates_buf`
部分实例仍然返回 `sync_buf`。两者都表示下一轮轮询应携带的游标。
"""
sync_buf = self._pick_present_value(
payload, ["get_updates_buf", "sync_buf", "syncBuf"]
)
items = payload.get("msgs")
if isinstance(items, list):
return items, sync_buf
return [], sync_buf
def _has_canonical_poll_shape(self, payload: Dict[str, Any]) -> bool:
@staticmethod
def _has_canonical_poll_shape(payload: Dict[str, Any]) -> bool:
"""官方响应至少应包含顶层 msgs 列表。"""
return isinstance(payload.get("msgs"), list)
def _is_poll_success(self, payload: Dict[str, Any]) -> bool:
def _resolve_poll_success(self, payload: Dict[str, Any]) -> Optional[bool]:
"""
判断 getupdates 是否明确成功。
判断 getupdates 是否给出了明确成功/失败信号
轮询接口不能沿用“只要没有明显报错就算成功”的宽松策略,否则服务端返回旧消息列表、
但状态码其实失败时,会被误判为可消费响应,导致旧消息再次进入业务链路。
返回 `None` 表示响应里没有显式状态,需要交给协议结构继续判断。
"""
if not payload:
return False
@@ -1286,7 +1295,7 @@ class ILinkClient:
return True
if lowered in {"failed", "fail", "error", "denied", "blocked"}:
return False
return False
return None
def _build_poll_result(
self,
@@ -1320,16 +1329,22 @@ class ILinkClient:
if not self.bot_token:
return [], self.sync_buf, {"success": False, "message": "bot token 未配置"}
url = f"{self.base_url}/ilink/bot/getupdates"
payload = {}
request_body = self._with_base_info({"get_updates_buf": self.sync_buf or ""})
resp = RequestUtils(
headers=self._headers(auth_required=True),
timeout=timeout_seconds + 10,
).post(url, json=request_body)
payload = self._json(resp)
success = bool(payload and self._is_poll_success(payload))
explicit_success = self._resolve_poll_success(payload)
has_canonical_shape = self._has_canonical_poll_shape(payload)
# 某些 iLink 部署不会返回 ret/success但顶层 msgs + sync_buf 已经足够表明
# 这是一次有效的轮询响应;只有出现显式失败信号时才应拒绝消费。
success = bool(payload) and (
explicit_success is True
or (explicit_success is None and has_canonical_shape)
)
last_message = None
if payload and not success:
if payload and explicit_success is False:
last_message = self._find_first_value(
payload, ["errmsg", "message", "error", "error_msg", "detail"]
) or self._short_text(payload)
@@ -1344,7 +1359,7 @@ class ILinkClient:
payload=payload,
message=last_message or "轮询响应未明确成功",
)
if not self._has_canonical_poll_shape(payload):
if not has_canonical_shape:
logger.warning(
"getupdates 返回非官方结构,已拒绝消费: %s",
self._short_text(payload),
@@ -1592,7 +1607,8 @@ class WechatClawBot:
return "image/gif"
return "application/octet-stream"
def _load_remote_image(self, image: str) -> Optional[bytes]:
@staticmethod
def _load_remote_image(image: str) -> Optional[bytes]:
image_url = str(image or "").strip()
if not image_url:
return None

View File

@@ -108,6 +108,33 @@ class WechatClawBotTest(unittest.TestCase):
self.assertEqual(client.sync_buf, "")
self.assertEqual(len(messages), 1)
def test_ilink_poll_updates_accepts_canonical_payload_without_explicit_success_flag(self):
client = ILinkClient(
base_url="https://ilinkai.weixin.qq.com",
bot_token="token",
sync_buf="cursor-old",
)
response = MagicMock()
response.json.return_value = {
"sync_buf": "cursor-new",
"msgs": [
{
"message_id": "msg-1002",
"from_user_id": "wxid_user_2",
"item_list": [{"type": 1, "text_item": {"text": "收到"}}],
}
],
}
with patch("app.modules.wechatclawbot.wechatclawbot.RequestUtils.post", return_value=response):
messages, sync_buf, result = client.poll_updates()
self.assertTrue(result["success"])
self.assertEqual(sync_buf, "cursor-new")
self.assertEqual(client.sync_buf, "cursor-new")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0].text, "收到")
def test_ilink_poll_updates_rejects_noncanonical_nested_success_payload(self):
client = ILinkClient(
base_url="https://ilinkai.weixin.qq.com",