From 65c18b1d52e034b4b27ec4d739f0072397183e6f Mon Sep 17 00:00:00 2001 From: EkkoG Date: Sat, 7 Mar 2026 23:21:07 +0800 Subject: [PATCH 1/2] feat(qqbot): implement QQ Bot notification module with API and WebSocket support - Added QQ Bot notification module to facilitate proactive message sending and message reception via Gateway. - Implemented API functions for sending C2C and group messages. - Established WebSocket client for real-time message handling. - Updated requirements to include websocket-client dependency. - Enhanced schemas to support QQ channel capabilities and notification configurations. --- app/modules/qqbot/__init__.py | 180 +++++++++++++++++++ app/modules/qqbot/api.py | 193 +++++++++++++++++++++ app/modules/qqbot/gateway.py | 199 +++++++++++++++++++++ app/modules/qqbot/qqbot.py | 314 ++++++++++++++++++++++++++++++++++ app/schemas/message.py | 11 ++ app/schemas/system.py | 2 +- app/schemas/types.py | 1 + requirements.in | 1 + 8 files changed, 900 insertions(+), 1 deletion(-) create mode 100644 app/modules/qqbot/__init__.py create mode 100644 app/modules/qqbot/api.py create mode 100644 app/modules/qqbot/gateway.py create mode 100644 app/modules/qqbot/qqbot.py diff --git a/app/modules/qqbot/__init__.py b/app/modules/qqbot/__init__.py new file mode 100644 index 00000000..fd491937 --- /dev/null +++ b/app/modules/qqbot/__init__.py @@ -0,0 +1,180 @@ +""" +QQ Bot 通知模块 +基于 QQ 开放平台,支持主动消息推送和 Gateway 接收消息 +注意:用户/群需曾与机器人交互过才能收到主动消息,且每月有配额限制 +""" + +import json +from typing import Optional, List, Tuple, Union, Any + +from app.core.context import MediaInfo, Context +from app.log import logger +from app.modules import _ModuleBase, _MessageBase +from app.modules.qqbot.qqbot import QQBot +from app.schemas import CommingMessage, MessageChannel, Notification +from app.schemas.types import ModuleType + + +class QQBotModule(_ModuleBase, _MessageBase[QQBot]): + """QQ Bot 通知模块""" + + def init_module(self) -> None: + super().init_service(service_name=QQBot.__name__.lower(), service_type=QQBot) + self._channel = MessageChannel.QQ + + @staticmethod + def get_name() -> str: + return "QQ" + + @staticmethod + def get_type() -> ModuleType: + return ModuleType.Notification + + @staticmethod + def get_subtype() -> MessageChannel: + return MessageChannel.QQ + + @staticmethod + def get_priority() -> int: + return 10 + + def stop(self) -> None: + for client in self.get_instances().values(): + if hasattr(client, "stop"): + client.stop() + + def test(self) -> Optional[Tuple[bool, str]]: + if not self.get_instances(): + return None + for name, client in self.get_instances().items(): + if not client.get_state(): + return False, f"QQ Bot {name} 未就绪" + return True, "" + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def message_parser( + self, source: str, body: Any, form: Any, args: Any + ) -> Optional[CommingMessage]: + """ + 解析 Gateway 转发的 QQ 消息 + body 格式: {"type": "C2C_MESSAGE_CREATE"|"GROUP_AT_MESSAGE_CREATE", "content": "...", "author": {...}, "id": "...", ...} + """ + client_config = self.get_config(source) + if not client_config: + return None + try: + if isinstance(body, bytes): + msg_body = json.loads(body) + elif isinstance(body, dict): + msg_body = body + else: + return None + except (json.JSONDecodeError, TypeError) as err: + logger.debug(f"解析 QQ 消息失败: {err}") + return None + + msg_type = msg_body.get("type") + content = (msg_body.get("content") or "").strip() + if not content: + return None + + if msg_type == "C2C_MESSAGE_CREATE": + author = msg_body.get("author", {}) + user_openid = author.get("user_openid", "") + if not user_openid: + return None + logger.info(f"收到 QQ 私聊消息: userid={user_openid}, text={content[:50]}...") + return CommingMessage( + channel=MessageChannel.QQ, + source=client_config.name, + userid=user_openid, + username=user_openid, + text=content, + ) + elif msg_type == "GROUP_AT_MESSAGE_CREATE": + author = msg_body.get("author", {}) + member_openid = author.get("member_openid", "") + group_openid = msg_body.get("group_openid", "") + # 群聊用 group:group_openid 作为 userid,便于回复时识别 + userid = f"group:{group_openid}" if group_openid else member_openid + logger.info(f"收到 QQ 群消息: group={group_openid}, userid={member_openid}, text={content[:50]}...") + return CommingMessage( + channel=MessageChannel.QQ, + source=client_config.name, + userid=userid, + username=member_openid or group_openid, + text=content, + ) + return None + + def post_message(self, message: Notification, **kwargs) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + userid = targets.get("qq_group_openid") or targets.get("qq_group") + if userid: + userid = f"group:{userid}" + # 无 userid 且无默认配置时,由 client 向曾发过消息的用户/群广播 + client: QQBot = self.get_instance(conf.name) + if client: + client.send_msg( + title=message.title, + text=message.text, + image=message.image, + link=message.link, + userid=userid, + targets=targets, + ) + + def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + g = targets.get("qq_group_openid") or targets.get("qq_group") + if g: + userid = f"group:{g}" + client: QQBot = self.get_instance(conf.name) + if client: + client.send_medias_msg( + medias=medias, + userid=userid, + title=message.title, + link=message.link, + targets=targets, + ) + + def post_torrents_message( + self, message: Notification, torrents: List[Context] + ) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + g = targets.get("qq_group_openid") or targets.get("qq_group") + if g: + userid = f"group:{g}" + client: QQBot = self.get_instance(conf.name) + if client: + client.send_torrents_msg( + torrents=torrents, + userid=userid, + title=message.title, + link=message.link, + targets=targets, + ) diff --git a/app/modules/qqbot/api.py b/app/modules/qqbot/api.py new file mode 100644 index 00000000..95ce1075 --- /dev/null +++ b/app/modules/qqbot/api.py @@ -0,0 +1,193 @@ +""" +QQ Bot API - Python 实现 +参考 QQ 开放平台官方 API: https://bot.q.qq.com/wiki/develop/api/ +""" + +import time +from typing import Optional, Literal + +from app.log import logger +from app.utils.http import RequestUtils + +API_BASE = "https://api.sgroup.qq.com" +TOKEN_URL = "https://bots.qq.com/app/getAppAccessToken" + +# Token 缓存 +_cached_token: Optional[dict] = None + + +def get_access_token(app_id: str, client_secret: str) -> str: + """ + 获取 AccessToken(带缓存,提前 5 分钟刷新) + """ + global _cached_token + now_ms = int(time.time() * 1000) + if _cached_token and now_ms < _cached_token["expires_at"] - 5 * 60 * 1000 and _cached_token["app_id"] == app_id: + return _cached_token["token"] + + if _cached_token and _cached_token["app_id"] != app_id: + _cached_token = None + + try: + resp = RequestUtils(timeout=30).post_res( + TOKEN_URL, + json={"appId": app_id, "clientSecret": client_secret}, # QQ API 使用 camelCase + headers={"Content-Type": "application/json"}, + ) + if not resp or not resp.json(): + raise ValueError("Failed to get access_token: empty response") + data = resp.json() + token = data.get("access_token") + expires_in = data.get("expires_in", 7200) + if not token: + raise ValueError(f"Failed to get access_token: {data}") + + # expires_in 可能为字符串,统一转为 int + expires_in = int(expires_in) if expires_in is not None else 7200 + + _cached_token = { + "token": token, + "expires_at": now_ms + expires_in * 1000, + "app_id": app_id, + } + logger.debug(f"QQ API: Token cached for app_id={app_id}") + return token + except Exception as e: + logger.error(f"QQ API: get_access_token failed: {e}") + raise + + +def clear_token_cache() -> None: + """清除 Token 缓存""" + global _cached_token + _cached_token = None + + +def _api_request( + access_token: str, + method: str, + path: str, + body: Optional[dict] = None, + timeout: int = 30, +) -> dict: + """通用 API 请求""" + url = f"{API_BASE}{path}" + headers = { + "Authorization": f"QQBot {access_token}", + "Content-Type": "application/json", + } + try: + if method.upper() == "GET": + resp = RequestUtils(timeout=timeout).get_res(url, headers=headers) + else: + resp = RequestUtils(timeout=timeout).post_res( + url, json=body or {}, headers=headers + ) + if not resp: + raise ValueError("Empty response") + data = resp.json() + status = getattr(resp, "status_code", 0) + if status and status >= 400: + raise ValueError(f"API Error [{path}]: {data.get('message', data)}") + return data + except Exception as e: + logger.error(f"QQ API: {method} {path} failed: {e}") + raise + + +def send_proactive_c2c_message( + access_token: str, + openid: str, + content: str, +) -> dict: + """ + 主动发送 C2C 单聊消息(不需要 msg_id) + 注意:每月限 4 条/用户,且用户必须曾与机器人交互过 + """ + if not content or not content.strip(): + raise ValueError("主动消息内容不能为空") + body = {"content": content.strip(), "msg_type": 0} + return _api_request( + access_token, "POST", f"/v2/users/{openid}/messages", body + ) + + +def send_proactive_group_message( + access_token: str, + group_openid: str, + content: str, +) -> dict: + """ + 主动发送群聊消息(不需要 msg_id) + 注意:每月限 4 条/群,且群必须曾与机器人交互过 + """ + if not content or not content.strip(): + raise ValueError("主动消息内容不能为空") + body = {"content": content.strip(), "msg_type": 0} + return _api_request( + access_token, "POST", f"/v2/groups/{group_openid}/messages", body + ) + + +def send_c2c_message( + access_token: str, + openid: str, + content: str, + msg_id: Optional[str] = None, +) -> dict: + """被动回复 C2C 单聊消息(1 小时内最多 4 次)""" + body = {"content": content, "msg_type": 0, "msg_seq": 1} + if msg_id: + body["msg_id"] = msg_id + return _api_request( + access_token, "POST", f"/v2/users/{openid}/messages", body + ) + + +def send_group_message( + access_token: str, + group_openid: str, + content: str, + msg_id: Optional[str] = None, +) -> dict: + """被动回复群聊消息(1 小时内最多 4 次)""" + body = {"content": content, "msg_type": 0, "msg_seq": 1} + if msg_id: + body["msg_id"] = msg_id + return _api_request( + access_token, "POST", f"/v2/groups/{group_openid}/messages", body + ) + + +def get_gateway_url(access_token: str) -> str: + """ + 获取 WebSocket Gateway URL + """ + data = _api_request(access_token, "GET", "/gateway") + url = data.get("url") + if not url: + raise ValueError("Gateway URL not found in response") + return url + + +def send_message( + access_token: str, + target: str, + content: str, + msg_type: Literal["c2c", "group"] = "c2c", + msg_id: Optional[str] = None, +) -> dict: + """ + 统一发送接口 + :param target: openid(c2c)或 group_openid(group) + :param content: 消息内容 + :param msg_type: c2c 单聊 / group 群聊 + :param msg_id: 可选,被动回复时传入原消息 id + """ + if msg_id: + if msg_type == "c2c": + return send_c2c_message(access_token, target, content, msg_id) + return send_group_message(access_token, target, content, msg_id) + if msg_type == "c2c": + return send_proactive_c2c_message(access_token, target, content) + return send_proactive_group_message(access_token, target, content) diff --git a/app/modules/qqbot/gateway.py b/app/modules/qqbot/gateway.py new file mode 100644 index 00000000..ba43f797 --- /dev/null +++ b/app/modules/qqbot/gateway.py @@ -0,0 +1,199 @@ +""" +QQ Bot Gateway WebSocket 客户端 +连接 QQ 开放平台 Gateway,接收 C2C 和群聊消息并转发至 MP 消息链 +""" + +import json +import threading +import time +from typing import Callable, Optional + +import websocket + +from app.log import logger + +# QQ Bot intents +INTENT_GROUP_AND_C2C = 1 << 25 # 群聊和 C2C 私聊 + + +def run_gateway( + app_id: str, + app_secret: str, + config_name: str, + get_token_fn: Callable[[str, str], str], + get_gateway_url_fn: Callable[[str], str], + on_message_fn: Callable[[dict], None], + stop_event: threading.Event, +) -> None: + """ + 在后台线程中运行 Gateway WebSocket 连接 + :param app_id: QQ 机器人 AppID + :param app_secret: QQ 机器人 AppSecret + :param config_name: 配置名称,用于消息来源标识 + :param get_token_fn: 获取 access_token 的函数 (app_id, app_secret) -> token + :param get_gateway_url_fn: 获取 gateway URL 的函数 (token) -> url + :param on_message_fn: 收到消息时的回调 (payload_dict) -> None + :param stop_event: 停止事件,set 时退出循环 + """ + last_seq: Optional[int] = None + heartbeat_interval_ms: Optional[int] = None + heartbeat_timer: Optional[threading.Timer] = None + ws_ref: list = [] # 用于在闭包中保持 ws 引用 + + def send_heartbeat(): + nonlocal heartbeat_timer + if stop_event.is_set(): + return + try: + if ws_ref and ws_ref[0]: + payload = {"op": 1, "d": last_seq} + ws_ref[0].send(json.dumps(payload)) + logger.debug(f"[QQ Gateway:{config_name}] Heartbeat sent, seq={last_seq}") + except Exception as e: + logger.debug(f"[QQ Gateway:{config_name}] Heartbeat error: {e}") + if heartbeat_interval_ms and not stop_event.is_set(): + heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) + heartbeat_timer.daemon = True + heartbeat_timer.start() + + def on_ws_message(_, message): + nonlocal last_seq, heartbeat_interval_ms, heartbeat_timer + try: + payload = json.loads(message) + except json.JSONDecodeError as e: + logger.error(f"[QQ Gateway:{config_name}] Invalid JSON: {e}") + return + + op = payload.get("op") + d = payload.get("d") + s = payload.get("s") + t = payload.get("t") + + if s is not None: + last_seq = s + + logger.debug(f"[QQ Gateway:{config_name}] op={op} t={t}") + + if op == 10: # Hello + heartbeat_interval_ms = d.get("heartbeat_interval", 30000) + logger.info(f"[QQ Gateway:{config_name}] Hello received, heartbeat_interval={heartbeat_interval_ms}") + + token = get_token_fn(app_id, app_secret) + gateway_url = get_gateway_url_fn(token) + + # Identify + identify = { + "op": 2, + "d": { + "token": f"QQBot {token}", + "intents": INTENT_GROUP_AND_C2C, + "shard": [0, 1], + }, + } + ws_ref[0].send(json.dumps(identify)) + logger.info(f"[QQ Gateway:{config_name}] Identify sent") + + # 启动心跳 + if heartbeat_timer: + heartbeat_timer.cancel() + heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) + heartbeat_timer.daemon = True + heartbeat_timer.start() + + elif op == 0: # Dispatch + if t == "READY": + session_id = d.get("session_id", "") + logger.info(f"[QQ Gateway:{config_name}] 连接成功 Ready, session_id={session_id}") + elif t == "RESUMED": + logger.info(f"[QQ Gateway:{config_name}] 连接成功 Session resumed") + elif t == "C2C_MESSAGE_CREATE": + author = d.get("author", {}) + user_openid = author.get("user_openid", "") + content = d.get("content", "").strip() + msg_id = d.get("id", "") + if content: + on_message_fn({ + "type": "C2C_MESSAGE_CREATE", + "content": content, + "author": {"user_openid": user_openid}, + "id": msg_id, + "timestamp": d.get("timestamp", ""), + }) + elif t == "GROUP_AT_MESSAGE_CREATE": + author = d.get("author", {}) + member_openid = author.get("member_openid", "") + group_openid = d.get("group_openid", "") + content = d.get("content", "").strip() + msg_id = d.get("id", "") + if content: + on_message_fn({ + "type": "GROUP_AT_MESSAGE_CREATE", + "content": content, + "author": {"member_openid": member_openid}, + "id": msg_id, + "group_openid": group_openid, + "timestamp": d.get("timestamp", ""), + }) + # 其他事件忽略 + + elif op == 7: # Reconnect + logger.info(f"[QQ Gateway:{config_name}] Reconnect requested") + # 当前实现不自动重连,由外层循环处理 + + elif op == 9: # Invalid Session + logger.warning(f"[QQ Gateway:{config_name}] Invalid session") + if ws_ref and ws_ref[0]: + ws_ref[0].close() + + def on_ws_error(_, error): + logger.error(f"[QQ Gateway:{config_name}] WebSocket error: {error}") + + def on_ws_close(_, close_status_code, close_msg): + logger.info(f"[QQ Gateway:{config_name}] WebSocket closed: {close_status_code} {close_msg}") + if heartbeat_timer: + heartbeat_timer.cancel() + + reconnect_delays = [1, 2, 5, 10, 30, 60] + attempt = 0 + + while not stop_event.is_set(): + try: + token = get_token_fn(app_id, app_secret) + gateway_url = get_gateway_url_fn(token) + logger.info(f"[QQ Gateway:{config_name}] Connecting to {gateway_url[:60]}...") + + ws = websocket.WebSocketApp( + gateway_url, + on_message=on_ws_message, + on_error=on_ws_error, + on_close=on_ws_close, + ) + ws_ref.clear() + ws_ref.append(ws) + + # run_forever 会阻塞,需要传入 stop_event 的检查 + # websocket-client 的 run_forever 支持 ping_interval, ping_timeout + # 我们使用自定义心跳,所以不设置 ping + ws.run_forever( + ping_interval=None, + ping_timeout=None, + skip_utf8_validation=True, + ) + + except Exception as e: + logger.error(f"[QQ Gateway:{config_name}] Connection error: {e}") + + if stop_event.is_set(): + break + + delay = reconnect_delays[min(attempt, len(reconnect_delays) - 1)] + attempt += 1 + logger.info(f"[QQ Gateway:{config_name}] Reconnecting in {delay}s (attempt {attempt})") + for _ in range(delay * 10): + if stop_event.is_set(): + break + time.sleep(0.1) + + if heartbeat_timer: + heartbeat_timer.cancel() + logger.info(f"[QQ Gateway:{config_name}] Gateway thread stopped") diff --git a/app/modules/qqbot/qqbot.py b/app/modules/qqbot/qqbot.py new file mode 100644 index 00000000..0a01af9b --- /dev/null +++ b/app/modules/qqbot/qqbot.py @@ -0,0 +1,314 @@ +""" +QQ Bot 通知客户端 +基于 QQ 开放平台 API,支持主动消息推送和 Gateway 接收消息 +""" + +import hashlib +import pickle +import threading +from typing import Optional, List + +from app.chain.message import MessageChain +from app.core.cache import FileCache +from app.core.context import MediaInfo, Context +from app.core.metainfo import MetaInfo +from app.log import logger +from app.modules.qqbot.api import ( + get_access_token, + get_gateway_url, + send_proactive_c2c_message, + send_proactive_group_message, +) +from app.modules.qqbot.gateway import run_gateway +from app.utils.string import StringUtils + + +class QQBot: + """QQ Bot 通知客户端""" + + def __init__( + self, + QQ_APP_ID: Optional[str] = None, + QQ_APP_SECRET: Optional[str] = None, + QQ_OPENID: Optional[str] = None, + QQ_GROUP_OPENID: Optional[str] = None, + name: Optional[str] = None, + **kwargs, + ): + """ + 初始化 QQ Bot + :param QQ_APP_ID: QQ 机器人 AppID + :param QQ_APP_SECRET: QQ 机器人 AppSecret + :param QQ_OPENID: 默认接收者 openid(单聊) + :param QQ_GROUP_OPENID: 默认群组 openid(群聊,与 QQ_OPENID 二选一) + :param name: 配置名称,用于消息来源标识和 Gateway 接收 + """ + if not QQ_APP_ID or not QQ_APP_SECRET: + logger.error("QQ Bot 配置不完整:缺少 AppID 或 AppSecret") + self._ready = False + return + + self._app_id = QQ_APP_ID + self._app_secret = QQ_APP_SECRET + self._default_openid = QQ_OPENID + self._default_group_openid = QQ_GROUP_OPENID + self._config_name = name or "qqbot" + self._ready = True + + # 曾发过消息的用户/群,用于无默认接收者时的广播 {(target_id, is_group), ...} + self._known_targets: set = set() + _safe_name = hashlib.md5(self._config_name.encode()).hexdigest()[:12] + self._cache_key = f"__qqbot_known_targets_{_safe_name}__" + self._filecache = FileCache() + self._load_known_targets() + # 已处理的消息 ID,用于去重(避免同一条消息重复处理) + self._processed_msg_ids: set = set() + self._max_processed_ids = 1000 + + # Gateway 后台线程 + self._gateway_stop = threading.Event() + self._gateway_thread = None + self._start_gateway() + + logger.info("QQ Bot 客户端初始化完成") + + def _load_known_targets(self) -> None: + """从缓存加载曾互动的用户/群""" + try: + content = self._filecache.get(self._cache_key) + if content: + data = pickle.loads(content) + if isinstance(data, (list, set)): + self._known_targets = set(tuple(x) for x in data) + except Exception as e: + logger.debug(f"QQ Bot 加载 known_targets 失败: {e}") + + def _save_known_targets(self) -> None: + """持久化曾互动的用户/群到缓存""" + try: + self._filecache.set(self._cache_key, pickle.dumps(list(self._known_targets))) + except Exception as e: + logger.debug(f"QQ Bot 保存 known_targets 失败: {e}") + + def _forward_to_message_chain(self, payload: dict) -> None: + """直接调用消息链处理,避免 HTTP 开销""" + def _run(): + try: + MessageChain().process( + body=payload, + form={}, + args={"source": self._config_name}, + ) + except Exception as e: + logger.error(f"QQ Bot 转发消息失败: {e}") + + threading.Thread(target=_run, daemon=True).start() + + def _on_gateway_message(self, payload: dict) -> None: + """Gateway 收到消息时转发至 MP 消息链,并记录发送者用于广播""" + msg_id = payload.get("id") + if msg_id: + if msg_id in self._processed_msg_ids: + logger.debug(f"QQ Bot: 跳过重复消息 id={msg_id}") + return + self._processed_msg_ids.add(msg_id) + if len(self._processed_msg_ids) > self._max_processed_ids: + self._processed_msg_ids.clear() + + # 记录发送者,用于无默认接收者时的广播 + msg_type = payload.get("type") + if msg_type == "C2C_MESSAGE_CREATE": + openid = (payload.get("author") or {}).get("user_openid") + if openid: + self._known_targets.add((openid, False)) + self._save_known_targets() + elif msg_type == "GROUP_AT_MESSAGE_CREATE": + group_openid = payload.get("group_openid") + if group_openid: + self._known_targets.add((group_openid, True)) + self._save_known_targets() + + self._forward_to_message_chain(payload) + + def _start_gateway(self) -> None: + """启动 Gateway WebSocket 连接(后台线程)""" + try: + self._gateway_thread = threading.Thread( + target=run_gateway, + kwargs={ + "app_id": self._app_id, + "app_secret": self._app_secret, + "config_name": self._config_name, + "get_token_fn": get_access_token, + "get_gateway_url_fn": get_gateway_url, + "on_message_fn": self._on_gateway_message, + "stop_event": self._gateway_stop, + }, + daemon=True, + ) + self._gateway_thread.start() + logger.info(f"QQ Bot Gateway 已启动: {self._config_name}") + except Exception as e: + logger.error(f"QQ Bot Gateway 启动失败: {e}") + + def stop(self) -> None: + """停止 Gateway 连接""" + if self._gateway_stop: + self._gateway_stop.set() + if self._gateway_thread and self._gateway_thread.is_alive(): + self._gateway_thread.join(timeout=5) + + def get_state(self) -> bool: + """获取就绪状态""" + return self._ready + + def _get_target(self, userid: Optional[str] = None, targets: Optional[dict] = None) -> tuple: + """ + 解析发送目标 + :return: (target_id, is_group) + """ + # 优先使用 userid(可能是 openid) + if userid: + # 格式支持:group:xxx 表示群聊 + if str(userid).lower().startswith("group:"): + return userid[6:].strip(), True + return str(userid), False + + # 从 targets 获取 + if targets: + qq_openid = targets.get("qq_userid") or targets.get("qq_openid") + qq_group = targets.get("qq_group_openid") or targets.get("qq_group") + if qq_group: + return str(qq_group), True + if qq_openid: + return str(qq_openid), False + + # 使用默认配置 + if self._default_group_openid: + return self._default_group_openid, True + if self._default_openid: + return self._default_openid, False + + return None, False + + def _get_broadcast_targets(self) -> list: + """获取广播目标列表(曾发过消息的用户/群)""" + return list(self._known_targets) + + def send_msg( + self, + title: str, + text: Optional[str] = None, + image: Optional[str] = None, + link: Optional[str] = None, + userid: Optional[str] = None, + targets: Optional[dict] = None, + **kwargs, + ) -> bool: + """ + 发送 QQ 消息 + :param title: 标题 + :param text: 正文 + :param image: 图片 URL(QQ 主动消息暂不支持图片,可拼入文本) + :param link: 链接 + :param userid: 目标 openid 或 group:xxx + :param targets: 目标字典 + """ + if not self._ready: + return False + + target, is_group = self._get_target(userid, targets) + targets_to_send = [] + if target: + targets_to_send = [(target, is_group)] + else: + # 无默认接收者时,向曾发过消息的用户/群广播 + broadcast = self._get_broadcast_targets() + if broadcast: + targets_to_send = broadcast + logger.debug(f"QQ Bot: 广播模式,共 {len(targets_to_send)} 个目标") + else: + logger.warn("QQ Bot: 未指定接收者且无互动用户,请在配置中设置 QQ_OPENID/QQ_GROUP_OPENID 或先让用户发消息") + return False + + # 拼接消息内容 + parts = [] + if title: + parts.append(f"【{title}】") + if text: + parts.append(text) + if image: + parts.append(image) + if link: + parts.append(link) + content = "\n".join(parts).strip() + + if not content: + logger.warn("QQ Bot: 消息内容为空") + return False + + success_count = 0 + try: + token = get_access_token(self._app_id, self._app_secret) + for tgt, tgt_is_group in targets_to_send: + try: + if tgt_is_group: + send_proactive_group_message(token, tgt, content) + else: + send_proactive_c2c_message(token, tgt, content) + success_count += 1 + logger.debug(f"QQ Bot: 消息已发送到 {'群' if tgt_is_group else '用户'} {tgt}") + except Exception as e: + logger.error(f"QQ Bot 发送失败 ({tgt}): {e}") + return success_count > 0 + except Exception as e: + logger.error(f"QQ Bot 发送失败: {e}") + return False + + def send_medias_msg( + self, + medias: List[MediaInfo], + userid: Optional[str] = None, + title: Optional[str] = None, + link: Optional[str] = None, + **kwargs, + ) -> bool: + """发送媒体列表(转为文本)""" + if not medias: + return False + lines = [f"{i + 1}. {m.title_year} - {m.type.value}" for i, m in enumerate(medias)] + text = "\n".join(lines) + return self.send_msg( + title=title or "媒体列表", + text=text, + link=link, + userid=userid, + **kwargs, + ) + + def send_torrents_msg( + self, + torrents: List[Context], + userid: Optional[str] = None, + title: Optional[str] = None, + link: Optional[str] = None, + **kwargs, + ) -> bool: + """发送种子列表(转为文本)""" + if not torrents: + return False + lines = [] + for i, ctx in enumerate(torrents): + t = ctx.torrent_info + meta = MetaInfo(t.title, t.description) + name = f"{meta.season_episode} {meta.resource_term} {meta.video_term}" + name = " ".join(name.split()) + lines.append(f"{i + 1}.【{t.site_name}】{name} {StringUtils.str_filesize(t.size)} {t.seeders}↑") + text = "\n".join(lines) + return self.send_msg( + title=title or "种子列表", + text=text, + link=link, + userid=userid, + **kwargs, + ) diff --git a/app/schemas/message.py b/app/schemas/message.py index 70992c13..86c265b1 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -114,6 +114,8 @@ class NotificationSwitch(BaseModel): vocechat: Optional[bool] = False # WebPush开关 webpush: Optional[bool] = False + # QQ开关 + qq: Optional[bool] = False class Subscription(BaseModel): @@ -270,6 +272,15 @@ class ChannelCapabilityManager: ChannelCapability.LINKS }, fallback_enabled=True + ), + MessageChannel.QQ: ChannelCapabilities( + channel=MessageChannel.QQ, + capabilities={ + ChannelCapability.RICH_TEXT, + ChannelCapability.IMAGES, + ChannelCapability.LINKS + }, + fallback_enabled=True ) } diff --git a/app/schemas/system.py b/app/schemas/system.py index c47e8554..6b82b78a 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -65,7 +65,7 @@ class NotificationConf(BaseModel): # 名称 name: Optional[str] = None - # 类型 telegram/wechat/vocechat/synologychat/slack/webpush + # 类型 telegram/wechat/vocechat/synologychat/slack/webpush/qqbot type: Optional[str] = None # 配置 config: Optional[dict] = Field(default_factory=dict) diff --git a/app/schemas/types.py b/app/schemas/types.py index 15076470..ad0a714f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -287,6 +287,7 @@ class MessageChannel(Enum): VoceChat = "VoceChat" Web = "Web" WebPush = "WebPush" + QQ = "QQ" # 下载器类型 diff --git a/requirements.in b/requirements.in index c929c610..b7427176 100644 --- a/requirements.in +++ b/requirements.in @@ -92,3 +92,4 @@ langchain-experimental~=0.3.4 openai~=1.108.2 google-generativeai~=0.8.5 ddgs~=9.10.0 +websocket-client~=1.8.0 From 425b822046f03d46171bf16e18bf4317f954b102 Mon Sep 17 00:00:00 2001 From: EkkoG Date: Sat, 7 Mar 2026 23:51:30 +0800 Subject: [PATCH 2/2] feat(qqbot): enhance message sending with Markdown support and image size detection - Added `use_markdown` parameter to `send_proactive_c2c_message` and `send_proactive_group_message` for Markdown formatting. - Implemented methods to escape Markdown characters and format messages accordingly. - Introduced image size detection for Markdown image rendering. - Updated message sending logic to fallback to plain text if Markdown is unsupported. --- app/modules/qqbot/api.py | 10 +++- app/modules/qqbot/qqbot.py | 117 +++++++++++++++++++++++++++++++------ config/app.env | 3 +- 3 files changed, 110 insertions(+), 20 deletions(-) diff --git a/app/modules/qqbot/api.py b/app/modules/qqbot/api.py index 95ce1075..143de77d 100644 --- a/app/modules/qqbot/api.py +++ b/app/modules/qqbot/api.py @@ -99,14 +99,17 @@ def send_proactive_c2c_message( access_token: str, openid: str, content: str, + use_markdown: bool = False, ) -> dict: """ 主动发送 C2C 单聊消息(不需要 msg_id) 注意:每月限 4 条/用户,且用户必须曾与机器人交互过 + :param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力) """ if not content or not content.strip(): raise ValueError("主动消息内容不能为空") - body = {"content": content.strip(), "msg_type": 0} + content = content.strip() + body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0} return _api_request( access_token, "POST", f"/v2/users/{openid}/messages", body ) @@ -116,14 +119,17 @@ def send_proactive_group_message( access_token: str, group_openid: str, content: str, + use_markdown: bool = False, ) -> dict: """ 主动发送群聊消息(不需要 msg_id) 注意:每月限 4 条/群,且群必须曾与机器人交互过 + :param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力) """ if not content or not content.strip(): raise ValueError("主动消息内容不能为空") - body = {"content": content.strip(), "msg_type": 0} + content = content.strip() + body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0} return _api_request( access_token, "POST", f"/v2/groups/{group_openid}/messages", body ) diff --git a/app/modules/qqbot/qqbot.py b/app/modules/qqbot/qqbot.py index 0a01af9b..c69aa162 100644 --- a/app/modules/qqbot/qqbot.py +++ b/app/modules/qqbot/qqbot.py @@ -4,9 +4,12 @@ QQ Bot 通知客户端 """ import hashlib +import io import pickle import threading -from typing import Optional, List +from typing import Optional, List, Tuple + +from PIL import Image from app.chain.message import MessageChain from app.core.cache import FileCache @@ -20,8 +23,12 @@ from app.modules.qqbot.api import ( send_proactive_group_message, ) from app.modules.qqbot.gateway import run_gateway +from app.utils.http import RequestUtils from app.utils.string import StringUtils +# QQ Markdown 图片默认尺寸(获取失败时使用,与 OpenClaw 对齐) +_DEFAULT_IMAGE_SIZE: Tuple[int, int] = (512, 512) + class QQBot: """QQ Bot 通知客户端""" @@ -195,6 +202,74 @@ class QQBot: """获取广播目标列表(曾发过消息的用户/群)""" return list(self._known_targets) + @staticmethod + def _get_image_size(url: str) -> Optional[Tuple[int, int]]: + """ + 从图片 URL 获取尺寸,只下载前 64KB 解析文件头(参考 OpenClaw) + :return: (width, height) 或 None + """ + try: + resp = RequestUtils(timeout=5).get_res( + url, + headers={"Range": "bytes=0-65535", "User-Agent": "QQBot-Image-Size-Detector/1.0"}, + ) + if not resp or not resp.content: + return None + data = resp.content[:65536] if len(resp.content) > 65536 else resp.content + with Image.open(io.BytesIO(data)) as img: + return (img.width, img.height) + except Exception as e: + logger.debug(f"QQ Bot 获取图片尺寸失败 ({url[:60]}...): {e}") + return None + + @staticmethod + def _escape_markdown(text: str) -> str: + """转义 Markdown 特殊字符,避免破坏格式。不转义 (),QQ 会误解析 \\( \\) 导致括号丢失或乱码""" + if not text: + return "" + text = text.replace("\\", "\\\\") + for char in ("*", "_", "[", "]", "`"): + text = text.replace(char, f"\\{char}") + return text + + @staticmethod + def _format_message_markdown( + title: Optional[str] = None, + text: Optional[str] = None, + image: Optional[str] = None, + link: Optional[str] = None, + ) -> tuple: + """ + 将消息格式化为 QQ Markdown,类似 Telegram 处理方式 + :return: (content, use_markdown) + """ + parts = [] + if title: + # 标题加粗,移除可能破坏格式的换行 + safe_title = (title or "").replace("\n", " ").strip() + if safe_title: + parts.append(f"**{QQBot._escape_markdown(safe_title)}**") + if text: + parts.append(QQBot._escape_markdown((text or "").strip())) + if image: + # QQ Markdown 图片需带尺寸才能正确渲染,格式: ![#宽px #高px](url),否则会显示为 [图片] 文本 + # 参考 OpenClaw,先获取图片真实尺寸,失败则用默认 512x512 + img_url = (image or "").strip() + if img_url and (img_url.startswith("http://") or img_url.startswith("https://")): + size = QQBot._get_image_size(img_url) + w, h = size if size else _DEFAULT_IMAGE_SIZE + if size: + logger.debug(f"QQ Bot 图片尺寸: {w}x{h} - {img_url[:60]}...") + parts.append(f"![#{w}px #{h}px]({img_url})") + elif img_url: + parts.append(img_url) + if link: + link_url = (link or "").strip() + if link_url: + parts.append(f"[查看详情]({link_url})") + content = "\n\n".join(p for p in parts if p).strip() + return content, bool(content) + def send_msg( self, title: str, @@ -231,17 +306,9 @@ class QQBot: logger.warn("QQ Bot: 未指定接收者且无互动用户,请在配置中设置 QQ_OPENID/QQ_GROUP_OPENID 或先让用户发消息") return False - # 拼接消息内容 - parts = [] - if title: - parts.append(f"【{title}】") - if text: - parts.append(text) - if image: - parts.append(image) - if link: - parts.append(link) - content = "\n".join(parts).strip() + # 使用 Markdown 格式发送(类似 Telegram) + content, use_markdown = self._format_message_markdown(title=title, text=text, image=image, link=link) + logger.info(f"QQ Bot 发送内容 (use_markdown={use_markdown}):\n{content}") if not content: logger.warn("QQ Bot: 消息内容为空") @@ -252,14 +319,30 @@ class QQBot: token = get_access_token(self._app_id, self._app_secret) for tgt, tgt_is_group in targets_to_send: try: - if tgt_is_group: - send_proactive_group_message(token, tgt, content) - else: - send_proactive_c2c_message(token, tgt, content) + send_fn = send_proactive_group_message if tgt_is_group else send_proactive_c2c_message + send_fn(token, tgt, content, use_markdown=use_markdown) success_count += 1 logger.debug(f"QQ Bot: 消息已发送到 {'群' if tgt_is_group else '用户'} {tgt}") except Exception as e: - logger.error(f"QQ Bot 发送失败 ({tgt}): {e}") + err_msg = str(e) + if use_markdown and ("markdown" in err_msg.lower() or "11244" in err_msg or "权限" in err_msg): + # Markdown 未开通时回退为纯文本 + plain_parts = [] + if title: + plain_parts.append(f"【{title}】") + if text: + plain_parts.append(text) + if image: + plain_parts.append(image) + if link: + plain_parts.append(link) + plain_content = "\n".join(plain_parts).strip() + if plain_content: + send_fn(token, tgt, plain_content, use_markdown=False) + success_count += 1 + logger.debug(f"QQ Bot: Markdown 不可用,已回退纯文本发送至 {tgt}") + else: + logger.error(f"QQ Bot 发送失败 ({tgt}): {e}") return success_count > 0 except Exception as e: logger.error(f"QQ Bot 发送失败: {e}") diff --git a/config/app.env b/config/app.env index 1e38b700..ecee59b7 100644 --- a/config/app.env +++ b/config/app.env @@ -1 +1,2 @@ -# MoviePilot V2版本,大部分设置可通过后台设置界面进行配置,仅个别配置需要通过环境变量或本配置文件配置,所有可配置项参考:https://wiki.movie-pilot.org/zh/configuration \ No newline at end of file +# MoviePilot V2版本,大部分设置可通过后台设置界面进行配置,仅个别配置需要通过环境变量或本配置文件配置,所有可配置项参考:https://wiki.movie-pilot.org/zh/configuration +API_TOKEN='8xKVMvGB6xgI0EctObr48or8fdb5Zwm0'