From 8b9dc0e77fe6c4b30acfd75d84e83eb2e35d98ab Mon Sep 17 00:00:00 2001 From: EkkoG Date: Sun, 5 Apr 2026 15:17:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20QQbot=E6=B8=A0=E9=81=93?= =?UTF-8?q?=E4=BE=9D=E6=97=A7=E4=BC=9A=E9=87=8D=E5=A4=8D=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/qqbot/__init__.py | 1 + app/modules/qqbot/gateway.py | 20 +++++++++++--------- app/modules/qqbot/qqbot.py | 19 ++++++++++++++++--- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/app/modules/qqbot/__init__.py b/app/modules/qqbot/__init__.py index fd491937..384ecae9 100644 --- a/app/modules/qqbot/__init__.py +++ b/app/modules/qqbot/__init__.py @@ -19,6 +19,7 @@ class QQBotModule(_ModuleBase, _MessageBase[QQBot]): """QQ Bot 通知模块""" def init_module(self) -> None: + self.stop() super().init_service(service_name=QQBot.__name__.lower(), service_type=QQBot) self._channel = MessageChannel.QQ diff --git a/app/modules/qqbot/gateway.py b/app/modules/qqbot/gateway.py index 432455cf..31cbd5d5 100644 --- a/app/modules/qqbot/gateway.py +++ b/app/modules/qqbot/gateway.py @@ -6,7 +6,7 @@ QQ Bot Gateway WebSocket 客户端 import json import threading import time -from typing import Callable, Optional +from typing import Callable, List, Optional import websocket @@ -24,6 +24,7 @@ def run_gateway( get_gateway_url_fn: Callable[[str], str], on_message_fn: Callable[[dict], None], stop_event: threading.Event, + ws_holder: List, ) -> None: """ 在后台线程中运行 Gateway WebSocket 连接 @@ -34,20 +35,20 @@ def run_gateway( :param get_gateway_url_fn: 获取 gateway URL 的函数 (token) -> url :param on_message_fn: 收到消息时的回调 (payload_dict) -> None :param stop_event: 停止事件,set 时退出循环 + :param ws_holder: 调用方持有的单元素列表,存放当前 WebSocketApp,供 stop() 时 close 以打断 run_forever """ last_seq: Optional[int] = None heartbeat_interval_ms: Optional[int] = None heartbeat_timer: Optional[threading.Timer] = None - ws_ref: list = [] # 用于在闭包中保持 ws 引用 def send_heartbeat(): nonlocal heartbeat_timer if stop_event.is_set(): return try: - if ws_ref and ws_ref[0]: + if ws_holder and ws_holder[0]: payload = {"op": 1, "d": last_seq} - ws_ref[0].send(json.dumps(payload)) + ws_holder[0].send(json.dumps(payload)) logger.debug(f"[QQ Gateway:{config_name}] Heartbeat sent, seq={last_seq}") except Exception as err: logger.debug(f"[QQ Gateway:{config_name}] Heartbeat error: {err}") @@ -87,7 +88,7 @@ def run_gateway( "shard": [0, 1], }, } - ws_ref[0].send(json.dumps(identify)) + ws_holder[0].send(json.dumps(identify)) logger.info(f"[QQ Gateway:{config_name}] Identify sent") # 启动心跳 @@ -139,8 +140,8 @@ def run_gateway( elif op == 9: # Invalid Session logger.warning(f"[QQ Gateway:{config_name}] Invalid session") - if ws_ref and ws_ref[0]: - ws_ref[0].close() + if ws_holder and ws_holder[0]: + ws_holder[0].close() def on_ws_error(_, error): logger.error(f"[QQ Gateway:{config_name}] WebSocket error: {error}") @@ -149,6 +150,7 @@ def run_gateway( logger.info(f"[QQ Gateway:{config_name}] WebSocket closed: {close_status_code} {close_msg}") if heartbeat_timer: heartbeat_timer.cancel() + ws_holder.clear() reconnect_delays = [1, 2, 5, 10, 30, 60] attempt = 0 @@ -165,8 +167,8 @@ def run_gateway( on_error=on_ws_error, on_close=on_ws_close, ) - ws_ref.clear() - ws_ref.append(ws) + ws_holder.clear() + ws_holder.append(ws) # run_forever 会阻塞,需要传入 stop_event 的检查 # websocket-client 的 run_forever 支持 ping_interval, ping_timeout diff --git a/app/modules/qqbot/qqbot.py b/app/modules/qqbot/qqbot.py index 4d88a98c..22a89a3e 100644 --- a/app/modules/qqbot/qqbot.py +++ b/app/modules/qqbot/qqbot.py @@ -50,6 +50,9 @@ class QQBot: :param QQ_GROUP_OPENID: 默认群组 openid(群聊,与 QQ_OPENID 二选一) :param name: 配置名称,用于消息来源标识和 Gateway 接收 """ + self._gateway_stop = None + self._gateway_thread = None + self._gateway_ws_holder: list = [] if not QQ_APP_ID or not QQ_APP_SECRET: logger.error("QQ Bot 配置不完整:缺少 AppID 或 AppSecret") self._ready = False @@ -151,6 +154,7 @@ class QQBot: "get_gateway_url_fn": get_gateway_url, "on_message_fn": self._on_gateway_message, "stop_event": self._gateway_stop, + "ws_holder": self._gateway_ws_holder, }, daemon=True, ) @@ -161,10 +165,19 @@ class QQBot: def stop(self) -> None: """停止 Gateway 连接""" - if self._gateway_stop: + if self._gateway_stop is not None: self._gateway_stop.set() - if self._gateway_thread and self._gateway_thread.is_alive(): - self._gateway_thread.join(timeout=5) + try: + if self._gateway_ws_holder: + self._gateway_ws_holder[0].close() + except Exception as e: + logger.debug(f"QQ Bot Gateway WebSocket close: {e}") + if self._gateway_thread is not None and self._gateway_thread.is_alive(): + self._gateway_thread.join(timeout=20) + if self._gateway_thread.is_alive(): + logger.warning( + "QQ Bot Gateway 线程在 stop 后仍未退出,可能存在重复收消息,请重启进程" + ) def get_state(self) -> bool: """获取就绪状态"""