diff --git a/app/modules/qqbot/__init__.py b/app/modules/qqbot/__init__.py new file mode 100644 index 00000000..fd491937 --- /dev/null +++ b/app/modules/qqbot/__init__.py @@ -0,0 +1,180 @@ +""" +QQ Bot 通知模块 +基于 QQ 开放平台,支持主动消息推送和 Gateway 接收消息 +注意:用户/群需曾与机器人交互过才能收到主动消息,且每月有配额限制 +""" + +import json +from typing import Optional, List, Tuple, Union, Any + +from app.core.context import MediaInfo, Context +from app.log import logger +from app.modules import _ModuleBase, _MessageBase +from app.modules.qqbot.qqbot import QQBot +from app.schemas import CommingMessage, MessageChannel, Notification +from app.schemas.types import ModuleType + + +class QQBotModule(_ModuleBase, _MessageBase[QQBot]): + """QQ Bot 通知模块""" + + def init_module(self) -> None: + super().init_service(service_name=QQBot.__name__.lower(), service_type=QQBot) + self._channel = MessageChannel.QQ + + @staticmethod + def get_name() -> str: + return "QQ" + + @staticmethod + def get_type() -> ModuleType: + return ModuleType.Notification + + @staticmethod + def get_subtype() -> MessageChannel: + return MessageChannel.QQ + + @staticmethod + def get_priority() -> int: + return 10 + + def stop(self) -> None: + for client in self.get_instances().values(): + if hasattr(client, "stop"): + client.stop() + + def test(self) -> Optional[Tuple[bool, str]]: + if not self.get_instances(): + return None + for name, client in self.get_instances().items(): + if not client.get_state(): + return False, f"QQ Bot {name} 未就绪" + return True, "" + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def message_parser( + self, source: str, body: Any, form: Any, args: Any + ) -> Optional[CommingMessage]: + """ + 解析 Gateway 转发的 QQ 消息 + body 格式: {"type": "C2C_MESSAGE_CREATE"|"GROUP_AT_MESSAGE_CREATE", "content": "...", "author": {...}, "id": "...", ...} + """ + client_config = self.get_config(source) + if not client_config: + return None + try: + if isinstance(body, bytes): + msg_body = json.loads(body) + elif isinstance(body, dict): + msg_body = body + else: + return None + except (json.JSONDecodeError, TypeError) as err: + logger.debug(f"解析 QQ 消息失败: {err}") + return None + + msg_type = msg_body.get("type") + content = (msg_body.get("content") or "").strip() + if not content: + return None + + if msg_type == "C2C_MESSAGE_CREATE": + author = msg_body.get("author", {}) + user_openid = author.get("user_openid", "") + if not user_openid: + return None + logger.info(f"收到 QQ 私聊消息: userid={user_openid}, text={content[:50]}...") + return CommingMessage( + channel=MessageChannel.QQ, + source=client_config.name, + userid=user_openid, + username=user_openid, + text=content, + ) + elif msg_type == "GROUP_AT_MESSAGE_CREATE": + author = msg_body.get("author", {}) + member_openid = author.get("member_openid", "") + group_openid = msg_body.get("group_openid", "") + # 群聊用 group:group_openid 作为 userid,便于回复时识别 + userid = f"group:{group_openid}" if group_openid else member_openid + logger.info(f"收到 QQ 群消息: group={group_openid}, userid={member_openid}, text={content[:50]}...") + return CommingMessage( + channel=MessageChannel.QQ, + source=client_config.name, + userid=userid, + username=member_openid or group_openid, + text=content, + ) + return None + + def post_message(self, message: Notification, **kwargs) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + userid = targets.get("qq_group_openid") or targets.get("qq_group") + if userid: + userid = f"group:{userid}" + # 无 userid 且无默认配置时,由 client 向曾发过消息的用户/群广播 + client: QQBot = self.get_instance(conf.name) + if client: + client.send_msg( + title=message.title, + text=message.text, + image=message.image, + link=message.link, + userid=userid, + targets=targets, + ) + + def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + g = targets.get("qq_group_openid") or targets.get("qq_group") + if g: + userid = f"group:{g}" + client: QQBot = self.get_instance(conf.name) + if client: + client.send_medias_msg( + medias=medias, + userid=userid, + title=message.title, + link=message.link, + targets=targets, + ) + + def post_torrents_message( + self, message: Notification, torrents: List[Context] + ) -> None: + for conf in self.get_configs().values(): + if not self.check_message(message, conf.name): + continue + targets = message.targets + userid = message.userid + if not userid and targets: + userid = targets.get("qq_userid") or targets.get("qq_openid") + if not userid: + g = targets.get("qq_group_openid") or targets.get("qq_group") + if g: + userid = f"group:{g}" + client: QQBot = self.get_instance(conf.name) + if client: + client.send_torrents_msg( + torrents=torrents, + userid=userid, + title=message.title, + link=message.link, + targets=targets, + ) diff --git a/app/modules/qqbot/api.py b/app/modules/qqbot/api.py new file mode 100644 index 00000000..95ce1075 --- /dev/null +++ b/app/modules/qqbot/api.py @@ -0,0 +1,193 @@ +""" +QQ Bot API - Python 实现 +参考 QQ 开放平台官方 API: https://bot.q.qq.com/wiki/develop/api/ +""" + +import time +from typing import Optional, Literal + +from app.log import logger +from app.utils.http import RequestUtils + +API_BASE = "https://api.sgroup.qq.com" +TOKEN_URL = "https://bots.qq.com/app/getAppAccessToken" + +# Token 缓存 +_cached_token: Optional[dict] = None + + +def get_access_token(app_id: str, client_secret: str) -> str: + """ + 获取 AccessToken(带缓存,提前 5 分钟刷新) + """ + global _cached_token + now_ms = int(time.time() * 1000) + if _cached_token and now_ms < _cached_token["expires_at"] - 5 * 60 * 1000 and _cached_token["app_id"] == app_id: + return _cached_token["token"] + + if _cached_token and _cached_token["app_id"] != app_id: + _cached_token = None + + try: + resp = RequestUtils(timeout=30).post_res( + TOKEN_URL, + json={"appId": app_id, "clientSecret": client_secret}, # QQ API 使用 camelCase + headers={"Content-Type": "application/json"}, + ) + if not resp or not resp.json(): + raise ValueError("Failed to get access_token: empty response") + data = resp.json() + token = data.get("access_token") + expires_in = data.get("expires_in", 7200) + if not token: + raise ValueError(f"Failed to get access_token: {data}") + + # expires_in 可能为字符串,统一转为 int + expires_in = int(expires_in) if expires_in is not None else 7200 + + _cached_token = { + "token": token, + "expires_at": now_ms + expires_in * 1000, + "app_id": app_id, + } + logger.debug(f"QQ API: Token cached for app_id={app_id}") + return token + except Exception as e: + logger.error(f"QQ API: get_access_token failed: {e}") + raise + + +def clear_token_cache() -> None: + """清除 Token 缓存""" + global _cached_token + _cached_token = None + + +def _api_request( + access_token: str, + method: str, + path: str, + body: Optional[dict] = None, + timeout: int = 30, +) -> dict: + """通用 API 请求""" + url = f"{API_BASE}{path}" + headers = { + "Authorization": f"QQBot {access_token}", + "Content-Type": "application/json", + } + try: + if method.upper() == "GET": + resp = RequestUtils(timeout=timeout).get_res(url, headers=headers) + else: + resp = RequestUtils(timeout=timeout).post_res( + url, json=body or {}, headers=headers + ) + if not resp: + raise ValueError("Empty response") + data = resp.json() + status = getattr(resp, "status_code", 0) + if status and status >= 400: + raise ValueError(f"API Error [{path}]: {data.get('message', data)}") + return data + except Exception as e: + logger.error(f"QQ API: {method} {path} failed: {e}") + raise + + +def send_proactive_c2c_message( + access_token: str, + openid: str, + content: str, +) -> dict: + """ + 主动发送 C2C 单聊消息(不需要 msg_id) + 注意:每月限 4 条/用户,且用户必须曾与机器人交互过 + """ + if not content or not content.strip(): + raise ValueError("主动消息内容不能为空") + body = {"content": content.strip(), "msg_type": 0} + return _api_request( + access_token, "POST", f"/v2/users/{openid}/messages", body + ) + + +def send_proactive_group_message( + access_token: str, + group_openid: str, + content: str, +) -> dict: + """ + 主动发送群聊消息(不需要 msg_id) + 注意:每月限 4 条/群,且群必须曾与机器人交互过 + """ + if not content or not content.strip(): + raise ValueError("主动消息内容不能为空") + body = {"content": content.strip(), "msg_type": 0} + return _api_request( + access_token, "POST", f"/v2/groups/{group_openid}/messages", body + ) + + +def send_c2c_message( + access_token: str, + openid: str, + content: str, + msg_id: Optional[str] = None, +) -> dict: + """被动回复 C2C 单聊消息(1 小时内最多 4 次)""" + body = {"content": content, "msg_type": 0, "msg_seq": 1} + if msg_id: + body["msg_id"] = msg_id + return _api_request( + access_token, "POST", f"/v2/users/{openid}/messages", body + ) + + +def send_group_message( + access_token: str, + group_openid: str, + content: str, + msg_id: Optional[str] = None, +) -> dict: + """被动回复群聊消息(1 小时内最多 4 次)""" + body = {"content": content, "msg_type": 0, "msg_seq": 1} + if msg_id: + body["msg_id"] = msg_id + return _api_request( + access_token, "POST", f"/v2/groups/{group_openid}/messages", body + ) + + +def get_gateway_url(access_token: str) -> str: + """ + 获取 WebSocket Gateway URL + """ + data = _api_request(access_token, "GET", "/gateway") + url = data.get("url") + if not url: + raise ValueError("Gateway URL not found in response") + return url + + +def send_message( + access_token: str, + target: str, + content: str, + msg_type: Literal["c2c", "group"] = "c2c", + msg_id: Optional[str] = None, +) -> dict: + """ + 统一发送接口 + :param target: openid(c2c)或 group_openid(group) + :param content: 消息内容 + :param msg_type: c2c 单聊 / group 群聊 + :param msg_id: 可选,被动回复时传入原消息 id + """ + if msg_id: + if msg_type == "c2c": + return send_c2c_message(access_token, target, content, msg_id) + return send_group_message(access_token, target, content, msg_id) + if msg_type == "c2c": + return send_proactive_c2c_message(access_token, target, content) + return send_proactive_group_message(access_token, target, content) diff --git a/app/modules/qqbot/gateway.py b/app/modules/qqbot/gateway.py new file mode 100644 index 00000000..ba43f797 --- /dev/null +++ b/app/modules/qqbot/gateway.py @@ -0,0 +1,199 @@ +""" +QQ Bot Gateway WebSocket 客户端 +连接 QQ 开放平台 Gateway,接收 C2C 和群聊消息并转发至 MP 消息链 +""" + +import json +import threading +import time +from typing import Callable, Optional + +import websocket + +from app.log import logger + +# QQ Bot intents +INTENT_GROUP_AND_C2C = 1 << 25 # 群聊和 C2C 私聊 + + +def run_gateway( + app_id: str, + app_secret: str, + config_name: str, + get_token_fn: Callable[[str, str], str], + get_gateway_url_fn: Callable[[str], str], + on_message_fn: Callable[[dict], None], + stop_event: threading.Event, +) -> None: + """ + 在后台线程中运行 Gateway WebSocket 连接 + :param app_id: QQ 机器人 AppID + :param app_secret: QQ 机器人 AppSecret + :param config_name: 配置名称,用于消息来源标识 + :param get_token_fn: 获取 access_token 的函数 (app_id, app_secret) -> token + :param get_gateway_url_fn: 获取 gateway URL 的函数 (token) -> url + :param on_message_fn: 收到消息时的回调 (payload_dict) -> None + :param stop_event: 停止事件,set 时退出循环 + """ + last_seq: Optional[int] = None + heartbeat_interval_ms: Optional[int] = None + heartbeat_timer: Optional[threading.Timer] = None + ws_ref: list = [] # 用于在闭包中保持 ws 引用 + + def send_heartbeat(): + nonlocal heartbeat_timer + if stop_event.is_set(): + return + try: + if ws_ref and ws_ref[0]: + payload = {"op": 1, "d": last_seq} + ws_ref[0].send(json.dumps(payload)) + logger.debug(f"[QQ Gateway:{config_name}] Heartbeat sent, seq={last_seq}") + except Exception as e: + logger.debug(f"[QQ Gateway:{config_name}] Heartbeat error: {e}") + if heartbeat_interval_ms and not stop_event.is_set(): + heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) + heartbeat_timer.daemon = True + heartbeat_timer.start() + + def on_ws_message(_, message): + nonlocal last_seq, heartbeat_interval_ms, heartbeat_timer + try: + payload = json.loads(message) + except json.JSONDecodeError as e: + logger.error(f"[QQ Gateway:{config_name}] Invalid JSON: {e}") + return + + op = payload.get("op") + d = payload.get("d") + s = payload.get("s") + t = payload.get("t") + + if s is not None: + last_seq = s + + logger.debug(f"[QQ Gateway:{config_name}] op={op} t={t}") + + if op == 10: # Hello + heartbeat_interval_ms = d.get("heartbeat_interval", 30000) + logger.info(f"[QQ Gateway:{config_name}] Hello received, heartbeat_interval={heartbeat_interval_ms}") + + token = get_token_fn(app_id, app_secret) + gateway_url = get_gateway_url_fn(token) + + # Identify + identify = { + "op": 2, + "d": { + "token": f"QQBot {token}", + "intents": INTENT_GROUP_AND_C2C, + "shard": [0, 1], + }, + } + ws_ref[0].send(json.dumps(identify)) + logger.info(f"[QQ Gateway:{config_name}] Identify sent") + + # 启动心跳 + if heartbeat_timer: + heartbeat_timer.cancel() + heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat) + heartbeat_timer.daemon = True + heartbeat_timer.start() + + elif op == 0: # Dispatch + if t == "READY": + session_id = d.get("session_id", "") + logger.info(f"[QQ Gateway:{config_name}] 连接成功 Ready, session_id={session_id}") + elif t == "RESUMED": + logger.info(f"[QQ Gateway:{config_name}] 连接成功 Session resumed") + elif t == "C2C_MESSAGE_CREATE": + author = d.get("author", {}) + user_openid = author.get("user_openid", "") + content = d.get("content", "").strip() + msg_id = d.get("id", "") + if content: + on_message_fn({ + "type": "C2C_MESSAGE_CREATE", + "content": content, + "author": {"user_openid": user_openid}, + "id": msg_id, + "timestamp": d.get("timestamp", ""), + }) + elif t == "GROUP_AT_MESSAGE_CREATE": + author = d.get("author", {}) + member_openid = author.get("member_openid", "") + group_openid = d.get("group_openid", "") + content = d.get("content", "").strip() + msg_id = d.get("id", "") + if content: + on_message_fn({ + "type": "GROUP_AT_MESSAGE_CREATE", + "content": content, + "author": {"member_openid": member_openid}, + "id": msg_id, + "group_openid": group_openid, + "timestamp": d.get("timestamp", ""), + }) + # 其他事件忽略 + + elif op == 7: # Reconnect + logger.info(f"[QQ Gateway:{config_name}] Reconnect requested") + # 当前实现不自动重连,由外层循环处理 + + elif op == 9: # Invalid Session + logger.warning(f"[QQ Gateway:{config_name}] Invalid session") + if ws_ref and ws_ref[0]: + ws_ref[0].close() + + def on_ws_error(_, error): + logger.error(f"[QQ Gateway:{config_name}] WebSocket error: {error}") + + def on_ws_close(_, close_status_code, close_msg): + logger.info(f"[QQ Gateway:{config_name}] WebSocket closed: {close_status_code} {close_msg}") + if heartbeat_timer: + heartbeat_timer.cancel() + + reconnect_delays = [1, 2, 5, 10, 30, 60] + attempt = 0 + + while not stop_event.is_set(): + try: + token = get_token_fn(app_id, app_secret) + gateway_url = get_gateway_url_fn(token) + logger.info(f"[QQ Gateway:{config_name}] Connecting to {gateway_url[:60]}...") + + ws = websocket.WebSocketApp( + gateway_url, + on_message=on_ws_message, + on_error=on_ws_error, + on_close=on_ws_close, + ) + ws_ref.clear() + ws_ref.append(ws) + + # run_forever 会阻塞,需要传入 stop_event 的检查 + # websocket-client 的 run_forever 支持 ping_interval, ping_timeout + # 我们使用自定义心跳,所以不设置 ping + ws.run_forever( + ping_interval=None, + ping_timeout=None, + skip_utf8_validation=True, + ) + + except Exception as e: + logger.error(f"[QQ Gateway:{config_name}] Connection error: {e}") + + if stop_event.is_set(): + break + + delay = reconnect_delays[min(attempt, len(reconnect_delays) - 1)] + attempt += 1 + logger.info(f"[QQ Gateway:{config_name}] Reconnecting in {delay}s (attempt {attempt})") + for _ in range(delay * 10): + if stop_event.is_set(): + break + time.sleep(0.1) + + if heartbeat_timer: + heartbeat_timer.cancel() + logger.info(f"[QQ Gateway:{config_name}] Gateway thread stopped") diff --git a/app/modules/qqbot/qqbot.py b/app/modules/qqbot/qqbot.py new file mode 100644 index 00000000..0a01af9b --- /dev/null +++ b/app/modules/qqbot/qqbot.py @@ -0,0 +1,314 @@ +""" +QQ Bot 通知客户端 +基于 QQ 开放平台 API,支持主动消息推送和 Gateway 接收消息 +""" + +import hashlib +import pickle +import threading +from typing import Optional, List + +from app.chain.message import MessageChain +from app.core.cache import FileCache +from app.core.context import MediaInfo, Context +from app.core.metainfo import MetaInfo +from app.log import logger +from app.modules.qqbot.api import ( + get_access_token, + get_gateway_url, + send_proactive_c2c_message, + send_proactive_group_message, +) +from app.modules.qqbot.gateway import run_gateway +from app.utils.string import StringUtils + + +class QQBot: + """QQ Bot 通知客户端""" + + def __init__( + self, + QQ_APP_ID: Optional[str] = None, + QQ_APP_SECRET: Optional[str] = None, + QQ_OPENID: Optional[str] = None, + QQ_GROUP_OPENID: Optional[str] = None, + name: Optional[str] = None, + **kwargs, + ): + """ + 初始化 QQ Bot + :param QQ_APP_ID: QQ 机器人 AppID + :param QQ_APP_SECRET: QQ 机器人 AppSecret + :param QQ_OPENID: 默认接收者 openid(单聊) + :param QQ_GROUP_OPENID: 默认群组 openid(群聊,与 QQ_OPENID 二选一) + :param name: 配置名称,用于消息来源标识和 Gateway 接收 + """ + if not QQ_APP_ID or not QQ_APP_SECRET: + logger.error("QQ Bot 配置不完整:缺少 AppID 或 AppSecret") + self._ready = False + return + + self._app_id = QQ_APP_ID + self._app_secret = QQ_APP_SECRET + self._default_openid = QQ_OPENID + self._default_group_openid = QQ_GROUP_OPENID + self._config_name = name or "qqbot" + self._ready = True + + # 曾发过消息的用户/群,用于无默认接收者时的广播 {(target_id, is_group), ...} + self._known_targets: set = set() + _safe_name = hashlib.md5(self._config_name.encode()).hexdigest()[:12] + self._cache_key = f"__qqbot_known_targets_{_safe_name}__" + self._filecache = FileCache() + self._load_known_targets() + # 已处理的消息 ID,用于去重(避免同一条消息重复处理) + self._processed_msg_ids: set = set() + self._max_processed_ids = 1000 + + # Gateway 后台线程 + self._gateway_stop = threading.Event() + self._gateway_thread = None + self._start_gateway() + + logger.info("QQ Bot 客户端初始化完成") + + def _load_known_targets(self) -> None: + """从缓存加载曾互动的用户/群""" + try: + content = self._filecache.get(self._cache_key) + if content: + data = pickle.loads(content) + if isinstance(data, (list, set)): + self._known_targets = set(tuple(x) for x in data) + except Exception as e: + logger.debug(f"QQ Bot 加载 known_targets 失败: {e}") + + def _save_known_targets(self) -> None: + """持久化曾互动的用户/群到缓存""" + try: + self._filecache.set(self._cache_key, pickle.dumps(list(self._known_targets))) + except Exception as e: + logger.debug(f"QQ Bot 保存 known_targets 失败: {e}") + + def _forward_to_message_chain(self, payload: dict) -> None: + """直接调用消息链处理,避免 HTTP 开销""" + def _run(): + try: + MessageChain().process( + body=payload, + form={}, + args={"source": self._config_name}, + ) + except Exception as e: + logger.error(f"QQ Bot 转发消息失败: {e}") + + threading.Thread(target=_run, daemon=True).start() + + def _on_gateway_message(self, payload: dict) -> None: + """Gateway 收到消息时转发至 MP 消息链,并记录发送者用于广播""" + msg_id = payload.get("id") + if msg_id: + if msg_id in self._processed_msg_ids: + logger.debug(f"QQ Bot: 跳过重复消息 id={msg_id}") + return + self._processed_msg_ids.add(msg_id) + if len(self._processed_msg_ids) > self._max_processed_ids: + self._processed_msg_ids.clear() + + # 记录发送者,用于无默认接收者时的广播 + msg_type = payload.get("type") + if msg_type == "C2C_MESSAGE_CREATE": + openid = (payload.get("author") or {}).get("user_openid") + if openid: + self._known_targets.add((openid, False)) + self._save_known_targets() + elif msg_type == "GROUP_AT_MESSAGE_CREATE": + group_openid = payload.get("group_openid") + if group_openid: + self._known_targets.add((group_openid, True)) + self._save_known_targets() + + self._forward_to_message_chain(payload) + + def _start_gateway(self) -> None: + """启动 Gateway WebSocket 连接(后台线程)""" + try: + self._gateway_thread = threading.Thread( + target=run_gateway, + kwargs={ + "app_id": self._app_id, + "app_secret": self._app_secret, + "config_name": self._config_name, + "get_token_fn": get_access_token, + "get_gateway_url_fn": get_gateway_url, + "on_message_fn": self._on_gateway_message, + "stop_event": self._gateway_stop, + }, + daemon=True, + ) + self._gateway_thread.start() + logger.info(f"QQ Bot Gateway 已启动: {self._config_name}") + except Exception as e: + logger.error(f"QQ Bot Gateway 启动失败: {e}") + + def stop(self) -> None: + """停止 Gateway 连接""" + if self._gateway_stop: + self._gateway_stop.set() + if self._gateway_thread and self._gateway_thread.is_alive(): + self._gateway_thread.join(timeout=5) + + def get_state(self) -> bool: + """获取就绪状态""" + return self._ready + + def _get_target(self, userid: Optional[str] = None, targets: Optional[dict] = None) -> tuple: + """ + 解析发送目标 + :return: (target_id, is_group) + """ + # 优先使用 userid(可能是 openid) + if userid: + # 格式支持:group:xxx 表示群聊 + if str(userid).lower().startswith("group:"): + return userid[6:].strip(), True + return str(userid), False + + # 从 targets 获取 + if targets: + qq_openid = targets.get("qq_userid") or targets.get("qq_openid") + qq_group = targets.get("qq_group_openid") or targets.get("qq_group") + if qq_group: + return str(qq_group), True + if qq_openid: + return str(qq_openid), False + + # 使用默认配置 + if self._default_group_openid: + return self._default_group_openid, True + if self._default_openid: + return self._default_openid, False + + return None, False + + def _get_broadcast_targets(self) -> list: + """获取广播目标列表(曾发过消息的用户/群)""" + return list(self._known_targets) + + def send_msg( + self, + title: str, + text: Optional[str] = None, + image: Optional[str] = None, + link: Optional[str] = None, + userid: Optional[str] = None, + targets: Optional[dict] = None, + **kwargs, + ) -> bool: + """ + 发送 QQ 消息 + :param title: 标题 + :param text: 正文 + :param image: 图片 URL(QQ 主动消息暂不支持图片,可拼入文本) + :param link: 链接 + :param userid: 目标 openid 或 group:xxx + :param targets: 目标字典 + """ + if not self._ready: + return False + + target, is_group = self._get_target(userid, targets) + targets_to_send = [] + if target: + targets_to_send = [(target, is_group)] + else: + # 无默认接收者时,向曾发过消息的用户/群广播 + broadcast = self._get_broadcast_targets() + if broadcast: + targets_to_send = broadcast + logger.debug(f"QQ Bot: 广播模式,共 {len(targets_to_send)} 个目标") + else: + logger.warn("QQ Bot: 未指定接收者且无互动用户,请在配置中设置 QQ_OPENID/QQ_GROUP_OPENID 或先让用户发消息") + return False + + # 拼接消息内容 + parts = [] + if title: + parts.append(f"【{title}】") + if text: + parts.append(text) + if image: + parts.append(image) + if link: + parts.append(link) + content = "\n".join(parts).strip() + + if not content: + logger.warn("QQ Bot: 消息内容为空") + return False + + success_count = 0 + try: + token = get_access_token(self._app_id, self._app_secret) + for tgt, tgt_is_group in targets_to_send: + try: + if tgt_is_group: + send_proactive_group_message(token, tgt, content) + else: + send_proactive_c2c_message(token, tgt, content) + success_count += 1 + logger.debug(f"QQ Bot: 消息已发送到 {'群' if tgt_is_group else '用户'} {tgt}") + except Exception as e: + logger.error(f"QQ Bot 发送失败 ({tgt}): {e}") + return success_count > 0 + except Exception as e: + logger.error(f"QQ Bot 发送失败: {e}") + return False + + def send_medias_msg( + self, + medias: List[MediaInfo], + userid: Optional[str] = None, + title: Optional[str] = None, + link: Optional[str] = None, + **kwargs, + ) -> bool: + """发送媒体列表(转为文本)""" + if not medias: + return False + lines = [f"{i + 1}. {m.title_year} - {m.type.value}" for i, m in enumerate(medias)] + text = "\n".join(lines) + return self.send_msg( + title=title or "媒体列表", + text=text, + link=link, + userid=userid, + **kwargs, + ) + + def send_torrents_msg( + self, + torrents: List[Context], + userid: Optional[str] = None, + title: Optional[str] = None, + link: Optional[str] = None, + **kwargs, + ) -> bool: + """发送种子列表(转为文本)""" + if not torrents: + return False + lines = [] + for i, ctx in enumerate(torrents): + t = ctx.torrent_info + meta = MetaInfo(t.title, t.description) + name = f"{meta.season_episode} {meta.resource_term} {meta.video_term}" + name = " ".join(name.split()) + lines.append(f"{i + 1}.【{t.site_name}】{name} {StringUtils.str_filesize(t.size)} {t.seeders}↑") + text = "\n".join(lines) + return self.send_msg( + title=title or "种子列表", + text=text, + link=link, + userid=userid, + **kwargs, + ) diff --git a/app/schemas/message.py b/app/schemas/message.py index 70992c13..86c265b1 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -114,6 +114,8 @@ class NotificationSwitch(BaseModel): vocechat: Optional[bool] = False # WebPush开关 webpush: Optional[bool] = False + # QQ开关 + qq: Optional[bool] = False class Subscription(BaseModel): @@ -270,6 +272,15 @@ class ChannelCapabilityManager: ChannelCapability.LINKS }, fallback_enabled=True + ), + MessageChannel.QQ: ChannelCapabilities( + channel=MessageChannel.QQ, + capabilities={ + ChannelCapability.RICH_TEXT, + ChannelCapability.IMAGES, + ChannelCapability.LINKS + }, + fallback_enabled=True ) } diff --git a/app/schemas/system.py b/app/schemas/system.py index c47e8554..6b82b78a 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -65,7 +65,7 @@ class NotificationConf(BaseModel): # 名称 name: Optional[str] = None - # 类型 telegram/wechat/vocechat/synologychat/slack/webpush + # 类型 telegram/wechat/vocechat/synologychat/slack/webpush/qqbot type: Optional[str] = None # 配置 config: Optional[dict] = Field(default_factory=dict) diff --git a/app/schemas/types.py b/app/schemas/types.py index 15076470..ad0a714f 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -287,6 +287,7 @@ class MessageChannel(Enum): VoceChat = "VoceChat" Web = "Web" WebPush = "WebPush" + QQ = "QQ" # 下载器类型 diff --git a/requirements.in b/requirements.in index c929c610..b7427176 100644 --- a/requirements.in +++ b/requirements.in @@ -92,3 +92,4 @@ langchain-experimental~=0.3.4 openai~=1.108.2 google-generativeai~=0.8.5 ddgs~=9.10.0 +websocket-client~=1.8.0