feat(qqbot): implement QQ Bot notification module with API and WebSocket support

- Added QQ Bot notification module to facilitate proactive message sending and message reception via Gateway.
- Implemented API functions for sending C2C and group messages.
- Established WebSocket client for real-time message handling.
- Updated requirements to include websocket-client dependency.
- Enhanced schemas to support QQ channel capabilities and notification configurations.
This commit is contained in:
EkkoG
2026-03-07 23:21:07 +08:00
parent 1bddf3daa7
commit 65c18b1d52
8 changed files with 900 additions and 1 deletions

View File

@@ -0,0 +1,180 @@
"""
QQ Bot 通知模块
基于 QQ 开放平台,支持主动消息推送和 Gateway 接收消息
注意:用户/群需曾与机器人交互过才能收到主动消息,且每月有配额限制
"""
import json
from typing import Optional, List, Tuple, Union, Any
from app.core.context import MediaInfo, Context
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.qqbot.qqbot import QQBot
from app.schemas import CommingMessage, MessageChannel, Notification
from app.schemas.types import ModuleType
class QQBotModule(_ModuleBase, _MessageBase[QQBot]):
"""QQ Bot 通知模块"""
def init_module(self) -> None:
super().init_service(service_name=QQBot.__name__.lower(), service_type=QQBot)
self._channel = MessageChannel.QQ
@staticmethod
def get_name() -> str:
return "QQ"
@staticmethod
def get_type() -> ModuleType:
return ModuleType.Notification
@staticmethod
def get_subtype() -> MessageChannel:
return MessageChannel.QQ
@staticmethod
def get_priority() -> int:
return 10
def stop(self) -> None:
for client in self.get_instances().values():
if hasattr(client, "stop"):
client.stop()
def test(self) -> Optional[Tuple[bool, str]]:
if not self.get_instances():
return None
for name, client in self.get_instances().items():
if not client.get_state():
return False, f"QQ Bot {name} 未就绪"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def message_parser(
self, source: str, body: Any, form: Any, args: Any
) -> Optional[CommingMessage]:
"""
解析 Gateway 转发的 QQ 消息
body 格式: {"type": "C2C_MESSAGE_CREATE"|"GROUP_AT_MESSAGE_CREATE", "content": "...", "author": {...}, "id": "...", ...}
"""
client_config = self.get_config(source)
if not client_config:
return None
try:
if isinstance(body, bytes):
msg_body = json.loads(body)
elif isinstance(body, dict):
msg_body = body
else:
return None
except (json.JSONDecodeError, TypeError) as err:
logger.debug(f"解析 QQ 消息失败: {err}")
return None
msg_type = msg_body.get("type")
content = (msg_body.get("content") or "").strip()
if not content:
return None
if msg_type == "C2C_MESSAGE_CREATE":
author = msg_body.get("author", {})
user_openid = author.get("user_openid", "")
if not user_openid:
return None
logger.info(f"收到 QQ 私聊消息: userid={user_openid}, text={content[:50]}...")
return CommingMessage(
channel=MessageChannel.QQ,
source=client_config.name,
userid=user_openid,
username=user_openid,
text=content,
)
elif msg_type == "GROUP_AT_MESSAGE_CREATE":
author = msg_body.get("author", {})
member_openid = author.get("member_openid", "")
group_openid = msg_body.get("group_openid", "")
# 群聊用 group:group_openid 作为 userid便于回复时识别
userid = f"group:{group_openid}" if group_openid else member_openid
logger.info(f"收到 QQ 群消息: group={group_openid}, userid={member_openid}, text={content[:50]}...")
return CommingMessage(
channel=MessageChannel.QQ,
source=client_config.name,
userid=userid,
username=member_openid or group_openid,
text=content,
)
return None
def post_message(self, message: Notification, **kwargs) -> None:
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
userid = message.userid
if not userid and targets:
userid = targets.get("qq_userid") or targets.get("qq_openid")
if not userid:
userid = targets.get("qq_group_openid") or targets.get("qq_group")
if userid:
userid = f"group:{userid}"
# 无 userid 且无默认配置时,由 client 向曾发过消息的用户/群广播
client: QQBot = self.get_instance(conf.name)
if client:
client.send_msg(
title=message.title,
text=message.text,
image=message.image,
link=message.link,
userid=userid,
targets=targets,
)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
userid = message.userid
if not userid and targets:
userid = targets.get("qq_userid") or targets.get("qq_openid")
if not userid:
g = targets.get("qq_group_openid") or targets.get("qq_group")
if g:
userid = f"group:{g}"
client: QQBot = self.get_instance(conf.name)
if client:
client.send_medias_msg(
medias=medias,
userid=userid,
title=message.title,
link=message.link,
targets=targets,
)
def post_torrents_message(
self, message: Notification, torrents: List[Context]
) -> None:
for conf in self.get_configs().values():
if not self.check_message(message, conf.name):
continue
targets = message.targets
userid = message.userid
if not userid and targets:
userid = targets.get("qq_userid") or targets.get("qq_openid")
if not userid:
g = targets.get("qq_group_openid") or targets.get("qq_group")
if g:
userid = f"group:{g}"
client: QQBot = self.get_instance(conf.name)
if client:
client.send_torrents_msg(
torrents=torrents,
userid=userid,
title=message.title,
link=message.link,
targets=targets,
)

193
app/modules/qqbot/api.py Normal file
View File

@@ -0,0 +1,193 @@
"""
QQ Bot API - Python 实现
参考 QQ 开放平台官方 API: https://bot.q.qq.com/wiki/develop/api/
"""
import time
from typing import Optional, Literal
from app.log import logger
from app.utils.http import RequestUtils
API_BASE = "https://api.sgroup.qq.com"
TOKEN_URL = "https://bots.qq.com/app/getAppAccessToken"
# Token 缓存
_cached_token: Optional[dict] = None
def get_access_token(app_id: str, client_secret: str) -> str:
"""
获取 AccessToken带缓存提前 5 分钟刷新)
"""
global _cached_token
now_ms = int(time.time() * 1000)
if _cached_token and now_ms < _cached_token["expires_at"] - 5 * 60 * 1000 and _cached_token["app_id"] == app_id:
return _cached_token["token"]
if _cached_token and _cached_token["app_id"] != app_id:
_cached_token = None
try:
resp = RequestUtils(timeout=30).post_res(
TOKEN_URL,
json={"appId": app_id, "clientSecret": client_secret}, # QQ API 使用 camelCase
headers={"Content-Type": "application/json"},
)
if not resp or not resp.json():
raise ValueError("Failed to get access_token: empty response")
data = resp.json()
token = data.get("access_token")
expires_in = data.get("expires_in", 7200)
if not token:
raise ValueError(f"Failed to get access_token: {data}")
# expires_in 可能为字符串,统一转为 int
expires_in = int(expires_in) if expires_in is not None else 7200
_cached_token = {
"token": token,
"expires_at": now_ms + expires_in * 1000,
"app_id": app_id,
}
logger.debug(f"QQ API: Token cached for app_id={app_id}")
return token
except Exception as e:
logger.error(f"QQ API: get_access_token failed: {e}")
raise
def clear_token_cache() -> None:
"""清除 Token 缓存"""
global _cached_token
_cached_token = None
def _api_request(
access_token: str,
method: str,
path: str,
body: Optional[dict] = None,
timeout: int = 30,
) -> dict:
"""通用 API 请求"""
url = f"{API_BASE}{path}"
headers = {
"Authorization": f"QQBot {access_token}",
"Content-Type": "application/json",
}
try:
if method.upper() == "GET":
resp = RequestUtils(timeout=timeout).get_res(url, headers=headers)
else:
resp = RequestUtils(timeout=timeout).post_res(
url, json=body or {}, headers=headers
)
if not resp:
raise ValueError("Empty response")
data = resp.json()
status = getattr(resp, "status_code", 0)
if status and status >= 400:
raise ValueError(f"API Error [{path}]: {data.get('message', data)}")
return data
except Exception as e:
logger.error(f"QQ API: {method} {path} failed: {e}")
raise
def send_proactive_c2c_message(
access_token: str,
openid: str,
content: str,
) -> dict:
"""
主动发送 C2C 单聊消息(不需要 msg_id
注意:每月限 4 条/用户,且用户必须曾与机器人交互过
"""
if not content or not content.strip():
raise ValueError("主动消息内容不能为空")
body = {"content": content.strip(), "msg_type": 0}
return _api_request(
access_token, "POST", f"/v2/users/{openid}/messages", body
)
def send_proactive_group_message(
access_token: str,
group_openid: str,
content: str,
) -> dict:
"""
主动发送群聊消息(不需要 msg_id
注意:每月限 4 条/群,且群必须曾与机器人交互过
"""
if not content or not content.strip():
raise ValueError("主动消息内容不能为空")
body = {"content": content.strip(), "msg_type": 0}
return _api_request(
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
)
def send_c2c_message(
access_token: str,
openid: str,
content: str,
msg_id: Optional[str] = None,
) -> dict:
"""被动回复 C2C 单聊消息1 小时内最多 4 次)"""
body = {"content": content, "msg_type": 0, "msg_seq": 1}
if msg_id:
body["msg_id"] = msg_id
return _api_request(
access_token, "POST", f"/v2/users/{openid}/messages", body
)
def send_group_message(
access_token: str,
group_openid: str,
content: str,
msg_id: Optional[str] = None,
) -> dict:
"""被动回复群聊消息1 小时内最多 4 次)"""
body = {"content": content, "msg_type": 0, "msg_seq": 1}
if msg_id:
body["msg_id"] = msg_id
return _api_request(
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
)
def get_gateway_url(access_token: str) -> str:
"""
获取 WebSocket Gateway URL
"""
data = _api_request(access_token, "GET", "/gateway")
url = data.get("url")
if not url:
raise ValueError("Gateway URL not found in response")
return url
def send_message(
access_token: str,
target: str,
content: str,
msg_type: Literal["c2c", "group"] = "c2c",
msg_id: Optional[str] = None,
) -> dict:
"""
统一发送接口
:param target: openidc2c或 group_openidgroup
:param content: 消息内容
:param msg_type: c2c 单聊 / group 群聊
:param msg_id: 可选,被动回复时传入原消息 id
"""
if msg_id:
if msg_type == "c2c":
return send_c2c_message(access_token, target, content, msg_id)
return send_group_message(access_token, target, content, msg_id)
if msg_type == "c2c":
return send_proactive_c2c_message(access_token, target, content)
return send_proactive_group_message(access_token, target, content)

View File

@@ -0,0 +1,199 @@
"""
QQ Bot Gateway WebSocket 客户端
连接 QQ 开放平台 Gateway接收 C2C 和群聊消息并转发至 MP 消息链
"""
import json
import threading
import time
from typing import Callable, Optional
import websocket
from app.log import logger
# QQ Bot intents
INTENT_GROUP_AND_C2C = 1 << 25 # 群聊和 C2C 私聊
def run_gateway(
app_id: str,
app_secret: str,
config_name: str,
get_token_fn: Callable[[str, str], str],
get_gateway_url_fn: Callable[[str], str],
on_message_fn: Callable[[dict], None],
stop_event: threading.Event,
) -> None:
"""
在后台线程中运行 Gateway WebSocket 连接
:param app_id: QQ 机器人 AppID
:param app_secret: QQ 机器人 AppSecret
:param config_name: 配置名称,用于消息来源标识
:param get_token_fn: 获取 access_token 的函数 (app_id, app_secret) -> token
:param get_gateway_url_fn: 获取 gateway URL 的函数 (token) -> url
:param on_message_fn: 收到消息时的回调 (payload_dict) -> None
:param stop_event: 停止事件set 时退出循环
"""
last_seq: Optional[int] = None
heartbeat_interval_ms: Optional[int] = None
heartbeat_timer: Optional[threading.Timer] = None
ws_ref: list = [] # 用于在闭包中保持 ws 引用
def send_heartbeat():
nonlocal heartbeat_timer
if stop_event.is_set():
return
try:
if ws_ref and ws_ref[0]:
payload = {"op": 1, "d": last_seq}
ws_ref[0].send(json.dumps(payload))
logger.debug(f"[QQ Gateway:{config_name}] Heartbeat sent, seq={last_seq}")
except Exception as e:
logger.debug(f"[QQ Gateway:{config_name}] Heartbeat error: {e}")
if heartbeat_interval_ms and not stop_event.is_set():
heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat)
heartbeat_timer.daemon = True
heartbeat_timer.start()
def on_ws_message(_, message):
nonlocal last_seq, heartbeat_interval_ms, heartbeat_timer
try:
payload = json.loads(message)
except json.JSONDecodeError as e:
logger.error(f"[QQ Gateway:{config_name}] Invalid JSON: {e}")
return
op = payload.get("op")
d = payload.get("d")
s = payload.get("s")
t = payload.get("t")
if s is not None:
last_seq = s
logger.debug(f"[QQ Gateway:{config_name}] op={op} t={t}")
if op == 10: # Hello
heartbeat_interval_ms = d.get("heartbeat_interval", 30000)
logger.info(f"[QQ Gateway:{config_name}] Hello received, heartbeat_interval={heartbeat_interval_ms}")
token = get_token_fn(app_id, app_secret)
gateway_url = get_gateway_url_fn(token)
# Identify
identify = {
"op": 2,
"d": {
"token": f"QQBot {token}",
"intents": INTENT_GROUP_AND_C2C,
"shard": [0, 1],
},
}
ws_ref[0].send(json.dumps(identify))
logger.info(f"[QQ Gateway:{config_name}] Identify sent")
# 启动心跳
if heartbeat_timer:
heartbeat_timer.cancel()
heartbeat_timer = threading.Timer(heartbeat_interval_ms / 1000.0, send_heartbeat)
heartbeat_timer.daemon = True
heartbeat_timer.start()
elif op == 0: # Dispatch
if t == "READY":
session_id = d.get("session_id", "")
logger.info(f"[QQ Gateway:{config_name}] 连接成功 Ready, session_id={session_id}")
elif t == "RESUMED":
logger.info(f"[QQ Gateway:{config_name}] 连接成功 Session resumed")
elif t == "C2C_MESSAGE_CREATE":
author = d.get("author", {})
user_openid = author.get("user_openid", "")
content = d.get("content", "").strip()
msg_id = d.get("id", "")
if content:
on_message_fn({
"type": "C2C_MESSAGE_CREATE",
"content": content,
"author": {"user_openid": user_openid},
"id": msg_id,
"timestamp": d.get("timestamp", ""),
})
elif t == "GROUP_AT_MESSAGE_CREATE":
author = d.get("author", {})
member_openid = author.get("member_openid", "")
group_openid = d.get("group_openid", "")
content = d.get("content", "").strip()
msg_id = d.get("id", "")
if content:
on_message_fn({
"type": "GROUP_AT_MESSAGE_CREATE",
"content": content,
"author": {"member_openid": member_openid},
"id": msg_id,
"group_openid": group_openid,
"timestamp": d.get("timestamp", ""),
})
# 其他事件忽略
elif op == 7: # Reconnect
logger.info(f"[QQ Gateway:{config_name}] Reconnect requested")
# 当前实现不自动重连,由外层循环处理
elif op == 9: # Invalid Session
logger.warning(f"[QQ Gateway:{config_name}] Invalid session")
if ws_ref and ws_ref[0]:
ws_ref[0].close()
def on_ws_error(_, error):
logger.error(f"[QQ Gateway:{config_name}] WebSocket error: {error}")
def on_ws_close(_, close_status_code, close_msg):
logger.info(f"[QQ Gateway:{config_name}] WebSocket closed: {close_status_code} {close_msg}")
if heartbeat_timer:
heartbeat_timer.cancel()
reconnect_delays = [1, 2, 5, 10, 30, 60]
attempt = 0
while not stop_event.is_set():
try:
token = get_token_fn(app_id, app_secret)
gateway_url = get_gateway_url_fn(token)
logger.info(f"[QQ Gateway:{config_name}] Connecting to {gateway_url[:60]}...")
ws = websocket.WebSocketApp(
gateway_url,
on_message=on_ws_message,
on_error=on_ws_error,
on_close=on_ws_close,
)
ws_ref.clear()
ws_ref.append(ws)
# run_forever 会阻塞,需要传入 stop_event 的检查
# websocket-client 的 run_forever 支持 ping_interval, ping_timeout
# 我们使用自定义心跳,所以不设置 ping
ws.run_forever(
ping_interval=None,
ping_timeout=None,
skip_utf8_validation=True,
)
except Exception as e:
logger.error(f"[QQ Gateway:{config_name}] Connection error: {e}")
if stop_event.is_set():
break
delay = reconnect_delays[min(attempt, len(reconnect_delays) - 1)]
attempt += 1
logger.info(f"[QQ Gateway:{config_name}] Reconnecting in {delay}s (attempt {attempt})")
for _ in range(delay * 10):
if stop_event.is_set():
break
time.sleep(0.1)
if heartbeat_timer:
heartbeat_timer.cancel()
logger.info(f"[QQ Gateway:{config_name}] Gateway thread stopped")

314
app/modules/qqbot/qqbot.py Normal file
View File

@@ -0,0 +1,314 @@
"""
QQ Bot 通知客户端
基于 QQ 开放平台 API支持主动消息推送和 Gateway 接收消息
"""
import hashlib
import pickle
import threading
from typing import Optional, List
from app.chain.message import MessageChain
from app.core.cache import FileCache
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules.qqbot.api import (
get_access_token,
get_gateway_url,
send_proactive_c2c_message,
send_proactive_group_message,
)
from app.modules.qqbot.gateway import run_gateway
from app.utils.string import StringUtils
class QQBot:
"""QQ Bot 通知客户端"""
def __init__(
self,
QQ_APP_ID: Optional[str] = None,
QQ_APP_SECRET: Optional[str] = None,
QQ_OPENID: Optional[str] = None,
QQ_GROUP_OPENID: Optional[str] = None,
name: Optional[str] = None,
**kwargs,
):
"""
初始化 QQ Bot
:param QQ_APP_ID: QQ 机器人 AppID
:param QQ_APP_SECRET: QQ 机器人 AppSecret
:param QQ_OPENID: 默认接收者 openid单聊
:param QQ_GROUP_OPENID: 默认群组 openid群聊与 QQ_OPENID 二选一)
:param name: 配置名称,用于消息来源标识和 Gateway 接收
"""
if not QQ_APP_ID or not QQ_APP_SECRET:
logger.error("QQ Bot 配置不完整:缺少 AppID 或 AppSecret")
self._ready = False
return
self._app_id = QQ_APP_ID
self._app_secret = QQ_APP_SECRET
self._default_openid = QQ_OPENID
self._default_group_openid = QQ_GROUP_OPENID
self._config_name = name or "qqbot"
self._ready = True
# 曾发过消息的用户/群,用于无默认接收者时的广播 {(target_id, is_group), ...}
self._known_targets: set = set()
_safe_name = hashlib.md5(self._config_name.encode()).hexdigest()[:12]
self._cache_key = f"__qqbot_known_targets_{_safe_name}__"
self._filecache = FileCache()
self._load_known_targets()
# 已处理的消息 ID用于去重避免同一条消息重复处理
self._processed_msg_ids: set = set()
self._max_processed_ids = 1000
# Gateway 后台线程
self._gateway_stop = threading.Event()
self._gateway_thread = None
self._start_gateway()
logger.info("QQ Bot 客户端初始化完成")
def _load_known_targets(self) -> None:
"""从缓存加载曾互动的用户/群"""
try:
content = self._filecache.get(self._cache_key)
if content:
data = pickle.loads(content)
if isinstance(data, (list, set)):
self._known_targets = set(tuple(x) for x in data)
except Exception as e:
logger.debug(f"QQ Bot 加载 known_targets 失败: {e}")
def _save_known_targets(self) -> None:
"""持久化曾互动的用户/群到缓存"""
try:
self._filecache.set(self._cache_key, pickle.dumps(list(self._known_targets)))
except Exception as e:
logger.debug(f"QQ Bot 保存 known_targets 失败: {e}")
def _forward_to_message_chain(self, payload: dict) -> None:
"""直接调用消息链处理,避免 HTTP 开销"""
def _run():
try:
MessageChain().process(
body=payload,
form={},
args={"source": self._config_name},
)
except Exception as e:
logger.error(f"QQ Bot 转发消息失败: {e}")
threading.Thread(target=_run, daemon=True).start()
def _on_gateway_message(self, payload: dict) -> None:
"""Gateway 收到消息时转发至 MP 消息链,并记录发送者用于广播"""
msg_id = payload.get("id")
if msg_id:
if msg_id in self._processed_msg_ids:
logger.debug(f"QQ Bot: 跳过重复消息 id={msg_id}")
return
self._processed_msg_ids.add(msg_id)
if len(self._processed_msg_ids) > self._max_processed_ids:
self._processed_msg_ids.clear()
# 记录发送者,用于无默认接收者时的广播
msg_type = payload.get("type")
if msg_type == "C2C_MESSAGE_CREATE":
openid = (payload.get("author") or {}).get("user_openid")
if openid:
self._known_targets.add((openid, False))
self._save_known_targets()
elif msg_type == "GROUP_AT_MESSAGE_CREATE":
group_openid = payload.get("group_openid")
if group_openid:
self._known_targets.add((group_openid, True))
self._save_known_targets()
self._forward_to_message_chain(payload)
def _start_gateway(self) -> None:
"""启动 Gateway WebSocket 连接(后台线程)"""
try:
self._gateway_thread = threading.Thread(
target=run_gateway,
kwargs={
"app_id": self._app_id,
"app_secret": self._app_secret,
"config_name": self._config_name,
"get_token_fn": get_access_token,
"get_gateway_url_fn": get_gateway_url,
"on_message_fn": self._on_gateway_message,
"stop_event": self._gateway_stop,
},
daemon=True,
)
self._gateway_thread.start()
logger.info(f"QQ Bot Gateway 已启动: {self._config_name}")
except Exception as e:
logger.error(f"QQ Bot Gateway 启动失败: {e}")
def stop(self) -> None:
"""停止 Gateway 连接"""
if self._gateway_stop:
self._gateway_stop.set()
if self._gateway_thread and self._gateway_thread.is_alive():
self._gateway_thread.join(timeout=5)
def get_state(self) -> bool:
"""获取就绪状态"""
return self._ready
def _get_target(self, userid: Optional[str] = None, targets: Optional[dict] = None) -> tuple:
"""
解析发送目标
:return: (target_id, is_group)
"""
# 优先使用 userid可能是 openid
if userid:
# 格式支持group:xxx 表示群聊
if str(userid).lower().startswith("group:"):
return userid[6:].strip(), True
return str(userid), False
# 从 targets 获取
if targets:
qq_openid = targets.get("qq_userid") or targets.get("qq_openid")
qq_group = targets.get("qq_group_openid") or targets.get("qq_group")
if qq_group:
return str(qq_group), True
if qq_openid:
return str(qq_openid), False
# 使用默认配置
if self._default_group_openid:
return self._default_group_openid, True
if self._default_openid:
return self._default_openid, False
return None, False
def _get_broadcast_targets(self) -> list:
"""获取广播目标列表(曾发过消息的用户/群)"""
return list(self._known_targets)
def send_msg(
self,
title: str,
text: Optional[str] = None,
image: Optional[str] = None,
link: Optional[str] = None,
userid: Optional[str] = None,
targets: Optional[dict] = None,
**kwargs,
) -> bool:
"""
发送 QQ 消息
:param title: 标题
:param text: 正文
:param image: 图片 URLQQ 主动消息暂不支持图片,可拼入文本)
:param link: 链接
:param userid: 目标 openid 或 group:xxx
:param targets: 目标字典
"""
if not self._ready:
return False
target, is_group = self._get_target(userid, targets)
targets_to_send = []
if target:
targets_to_send = [(target, is_group)]
else:
# 无默认接收者时,向曾发过消息的用户/群广播
broadcast = self._get_broadcast_targets()
if broadcast:
targets_to_send = broadcast
logger.debug(f"QQ Bot: 广播模式,共 {len(targets_to_send)} 个目标")
else:
logger.warn("QQ Bot: 未指定接收者且无互动用户,请在配置中设置 QQ_OPENID/QQ_GROUP_OPENID 或先让用户发消息")
return False
# 拼接消息内容
parts = []
if title:
parts.append(f"{title}")
if text:
parts.append(text)
if image:
parts.append(image)
if link:
parts.append(link)
content = "\n".join(parts).strip()
if not content:
logger.warn("QQ Bot: 消息内容为空")
return False
success_count = 0
try:
token = get_access_token(self._app_id, self._app_secret)
for tgt, tgt_is_group in targets_to_send:
try:
if tgt_is_group:
send_proactive_group_message(token, tgt, content)
else:
send_proactive_c2c_message(token, tgt, content)
success_count += 1
logger.debug(f"QQ Bot: 消息已发送到 {'' if tgt_is_group else '用户'} {tgt}")
except Exception as e:
logger.error(f"QQ Bot 发送失败 ({tgt}): {e}")
return success_count > 0
except Exception as e:
logger.error(f"QQ Bot 发送失败: {e}")
return False
def send_medias_msg(
self,
medias: List[MediaInfo],
userid: Optional[str] = None,
title: Optional[str] = None,
link: Optional[str] = None,
**kwargs,
) -> bool:
"""发送媒体列表(转为文本)"""
if not medias:
return False
lines = [f"{i + 1}. {m.title_year} - {m.type.value}" for i, m in enumerate(medias)]
text = "\n".join(lines)
return self.send_msg(
title=title or "媒体列表",
text=text,
link=link,
userid=userid,
**kwargs,
)
def send_torrents_msg(
self,
torrents: List[Context],
userid: Optional[str] = None,
title: Optional[str] = None,
link: Optional[str] = None,
**kwargs,
) -> bool:
"""发送种子列表(转为文本)"""
if not torrents:
return False
lines = []
for i, ctx in enumerate(torrents):
t = ctx.torrent_info
meta = MetaInfo(t.title, t.description)
name = f"{meta.season_episode} {meta.resource_term} {meta.video_term}"
name = " ".join(name.split())
lines.append(f"{i + 1}.【{t.site_name}{name} {StringUtils.str_filesize(t.size)} {t.seeders}")
text = "\n".join(lines)
return self.send_msg(
title=title or "种子列表",
text=text,
link=link,
userid=userid,
**kwargs,
)

View File

@@ -114,6 +114,8 @@ class NotificationSwitch(BaseModel):
vocechat: Optional[bool] = False
# WebPush开关
webpush: Optional[bool] = False
# QQ开关
qq: Optional[bool] = False
class Subscription(BaseModel):
@@ -270,6 +272,15 @@ class ChannelCapabilityManager:
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.QQ: ChannelCapabilities(
channel=MessageChannel.QQ,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
)
}

View File

@@ -65,7 +65,7 @@ class NotificationConf(BaseModel):
# 名称
name: Optional[str] = None
# 类型 telegram/wechat/vocechat/synologychat/slack/webpush
# 类型 telegram/wechat/vocechat/synologychat/slack/webpush/qqbot
type: Optional[str] = None
# 配置
config: Optional[dict] = Field(default_factory=dict)

View File

@@ -287,6 +287,7 @@ class MessageChannel(Enum):
VoceChat = "VoceChat"
Web = "Web"
WebPush = "WebPush"
QQ = "QQ"
# 下载器类型