""" QQ Bot Gateway WebSocket 客户端 连接 QQ 开放平台 Gateway,接收 C2C 和群聊消息并转发至 MP 消息链 """ import json import threading import time from typing import Callable, Optional import websocket from app.log import logger # QQ Bot intents INTENT_GROUP_AND_C2C = 1 << 25 # 群聊和 C2C 私聊 def run_gateway( app_id: str, app_secret: str, config_name: str, get_token_fn: Callable[[str, str], str], get_gateway_url_fn: Callable[[str], str], on_message_fn: Callable[[dict], None], stop_event: threading.Event, ) -> None: """ 在后台线程中运行 Gateway WebSocket 连接 :param app_id: QQ 机器人 AppID :param app_secret: QQ 机器人 AppSecret :param config_name: 配置名称,用于消息来源标识 :param get_token_fn: 获取 access_token 的函数 (app_id, app_secret) -> token :param get_gateway_url_fn: 获取 gateway URL 的函数 (token) -> url :param on_message_fn: 收到消息时的回调 (payload_dict) -> None :param stop_event: 停止事件,set 时退出循环 """ last_seq: Optional[int] = None heartbeat_interval_ms: Optional[int] = None heartbeat_timer: Optional[threading.Timer] = None ws_ref: list = [] # 用于在闭包中保持 ws 引用 def send_heartbeat(): nonlocal heartbeat_timer if stop_event.is_set(): return try: if ws_ref and ws_ref[0]: payload = {"op": 1, "d": last_seq} ws_ref[0].send(json.dumps(payload)) logger.debug(f"[QQ Gateway:{config_name}] Heartbeat sent, seq={last_seq}") except Exception as e: logger.debug(f"[QQ Gateway:{config_name}] Heartbeat error: {e}") if heartbeat_interval_ms and not stop_event.is_set(): heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) heartbeat_timer.daemon = True heartbeat_timer.start() def on_ws_message(_, message): nonlocal last_seq, heartbeat_interval_ms, heartbeat_timer try: payload = json.loads(message) except json.JSONDecodeError as e: logger.error(f"[QQ Gateway:{config_name}] Invalid JSON: {e}") return op = payload.get("op") d = payload.get("d") s = payload.get("s") t = payload.get("t") if s is not None: last_seq = s logger.debug(f"[QQ Gateway:{config_name}] op={op} t={t}") if op == 10: # Hello heartbeat_interval_ms = d.get("heartbeat_interval", 30000) logger.info(f"[QQ Gateway:{config_name}] Hello received, heartbeat_interval={heartbeat_interval_ms}") token = get_token_fn(app_id, app_secret) gateway_url = get_gateway_url_fn(token) # Identify identify = { "op": 2, "d": { "token": f"QQBot {token}", "intents": INTENT_GROUP_AND_C2C, "shard": [0, 1], }, } ws_ref[0].send(json.dumps(identify)) logger.info(f"[QQ Gateway:{config_name}] Identify sent") # 启动心跳 if heartbeat_timer: heartbeat_timer.cancel() heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) heartbeat_timer.daemon = True heartbeat_timer.start() elif op == 0: # Dispatch if t == "READY": session_id = d.get("session_id", "") logger.info(f"[QQ Gateway:{config_name}] 连接成功 Ready, session_id={session_id}") elif t == "RESUMED": logger.info(f"[QQ Gateway:{config_name}] 连接成功 Session resumed") elif t == "C2C_MESSAGE_CREATE": author = d.get("author", {}) user_openid = author.get("user_openid", "") content = d.get("content", "").strip() msg_id = d.get("id", "") if content: on_message_fn({ "type": "C2C_MESSAGE_CREATE", "content": content, "author": {"user_openid": user_openid}, "id": msg_id, "timestamp": d.get("timestamp", ""), }) elif t == "GROUP_AT_MESSAGE_CREATE": author = d.get("author", {}) member_openid = author.get("member_openid", "") group_openid = d.get("group_openid", "") content = d.get("content", "").strip() msg_id = d.get("id", "") if content: on_message_fn({ "type": "GROUP_AT_MESSAGE_CREATE", "content": content, "author": {"member_openid": member_openid}, "id": msg_id, "group_openid": group_openid, "timestamp": d.get("timestamp", ""), }) # 其他事件忽略 elif op == 7: # Reconnect logger.info(f"[QQ Gateway:{config_name}] Reconnect requested") # 当前实现不自动重连,由外层循环处理 elif op == 9: # Invalid Session logger.warning(f"[QQ Gateway:{config_name}] Invalid session") if ws_ref and ws_ref[0]: ws_ref[0].close() def on_ws_error(_, error): logger.error(f"[QQ Gateway:{config_name}] WebSocket error: {error}") def on_ws_close(_, close_status_code, close_msg): logger.info(f"[QQ Gateway:{config_name}] WebSocket closed: {close_status_code} {close_msg}") if heartbeat_timer: heartbeat_timer.cancel() reconnect_delays = [1, 2, 5, 10, 30, 60] attempt = 0 while not stop_event.is_set(): try: token = get_token_fn(app_id, app_secret) gateway_url = get_gateway_url_fn(token) logger.info(f"[QQ Gateway:{config_name}] Connecting to {gateway_url[:60]}...") ws = websocket.WebSocketApp( gateway_url, on_message=on_ws_message, on_error=on_ws_error, on_close=on_ws_close, ) ws_ref.clear() ws_ref.append(ws) # run_forever 会阻塞,需要传入 stop_event 的检查 # websocket-client 的 run_forever 支持 ping_interval, ping_timeout # 我们使用自定义心跳,所以不设置 ping ws.run_forever( ping_interval=None, ping_timeout=None, skip_utf8_validation=True, ) except Exception as e: logger.error(f"[QQ Gateway:{config_name}] Connection error: {e}") if stop_event.is_set(): break delay = reconnect_delays[min(attempt, len(reconnect_delays) - 1)] attempt += 1 logger.info(f"[QQ Gateway:{config_name}] Reconnecting in {delay}s (attempt {attempt})") for _ in range(delay * 10): if stop_event.is_set(): break time.sleep(0.1) if heartbeat_timer: heartbeat_timer.cancel() logger.info(f"[QQ Gateway:{config_name}] Gateway thread stopped")