diff --git a/app/modules/discord/__init__.py b/app/modules/discord/__init__.py index 19b82cf6..82fc8664 100644 --- a/app/modules/discord/__init__.py +++ b/app/modules/discord/__init__.py @@ -23,6 +23,7 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]): if not Discord: logger.error("Discord 依赖未就绪(需要安装 discord.py==2.6.4),模块未启动") return + self.stop() super().init_service(service_name=Discord.__name__.lower(), service_type=Discord) self._channel = MessageChannel.Discord @@ -154,7 +155,8 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]): image=message.image, userid=userid, link=message.link, buttons=message.buttons, original_message_id=message.original_message_id, - original_chat_id=message.original_chat_id) + original_chat_id=message.original_chat_id, + mtype=message.mtype) def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None: """ diff --git a/app/modules/discord/discord.py b/app/modules/discord/discord.py index a327869f..1bd11a4e 100644 --- a/app/modules/discord/discord.py +++ b/app/modules/discord/discord.py @@ -11,8 +11,18 @@ from app.core.config import settings from app.core.context import MediaInfo, Context from app.core.metainfo import MetaInfo from app.log import logger +from app.schemas.types import NotificationType from app.utils.string import StringUtils +# Discord embed 字段解析白名单 +# 只有这些消息类型会使用复杂的字段解析逻辑 +PARSE_FIELD_TYPES = { + NotificationType.Download, # 资源下载 + NotificationType.Organize, # 整理入库 + NotificationType.Subscribe, # 订阅 + NotificationType.Manual, # 手动处理 +} + class Discord: """ @@ -156,7 +166,8 @@ class Discord: userid: Optional[str] = None, link: Optional[str] = None, buttons: Optional[List[List[dict]]] = None, original_message_id: Optional[Union[int, str]] = None, - original_chat_id: Optional[str] = None) -> Optional[bool]: + original_chat_id: Optional[str] = None, + mtype: Optional['NotificationType'] = None) -> Optional[bool]: if not self.get_state(): return False if not title and not text: @@ -168,7 +179,8 @@ class Discord: self._send_message(title=title, text=text, image=image, userid=userid, link=link, buttons=buttons, original_message_id=original_message_id, - original_chat_id=original_chat_id), + original_chat_id=original_chat_id, + mtype=mtype), self._loop) return future.result(timeout=30) except Exception as err: @@ -240,13 +252,14 @@ class Discord: userid: Optional[str], link: Optional[str], buttons: Optional[List[List[dict]]], original_message_id: Optional[Union[int, str]], - original_chat_id: Optional[str]) -> bool: + original_chat_id: Optional[str], + mtype: Optional['NotificationType'] = None) -> bool: channel = await self._resolve_channel(userid=userid, chat_id=original_chat_id) if not channel: logger.error("未找到可用的 Discord 频道或私聊") return False - embed = self._build_embed(title=title, text=text, image=image, link=link) + embed = self._build_embed(title=title, text=text, image=image, link=link, mtype=mtype) view = self._build_view(buttons=buttons, link=link) content = None @@ -318,9 +331,10 @@ class Discord: @staticmethod def _build_embed(title: str, text: Optional[str], image: Optional[str], - link: Optional[str]) -> discord.Embed: + link: Optional[str], mtype: Optional['NotificationType'] = None) -> discord.Embed: fields: List[Dict[str, str]] = [] desc_lines: List[str] = [] + should_parse_fields = mtype in PARSE_FIELD_TYPES if mtype else False def _collect_spans(s: str, left: str, right: str) -> List[Tuple[int, int]]: spans: List[Tuple[int, int]] = [] start = 0 @@ -346,53 +360,56 @@ class Discord: # 处理上游未反序列化的 "\n" 等转义换行,避免被当成普通字符 if "\\n" in text or "\\r" in text: text = text.replace("\\r\\n", "\n").replace("\\n", "\n").replace("\\r", "\n") - # 匹配形如 "字段:值" 的片段,字段名不允许包含常见分隔符; - # 下一个字段需以顿号/逗号/分号等分隔开,且不能是 URL 协议开头,避免值里出现 URL 的":" 被误拆 - name_re = r"[A-Za-z0-9\u4e00-\u9fa5_\-&]+" - pair_pattern = re.compile( - rf"({name_re})[::](.*?)(?=(?:[,,。;;、]+\s*(?!https?://|ftp://|ftps://|magnet:){name_re}[::])|$)", - re.IGNORECASE, - ) - for line in text.splitlines(): - line = line.strip() - if not line: - continue - matches = list(pair_pattern.finditer(line)) - if matches: - book_spans = _collect_spans(line, "《", "》") + _collect_spans(line, "【", "】") - if book_spans: - has_book_colon = False - for m in matches: - colon_idx = _find_colon_index(line, m) - if colon_idx is not None and any(l < colon_idx < r for l, r in book_spans): - has_book_colon = True - break - if has_book_colon: + if not should_parse_fields: + desc_lines.append(text.strip()) + else: + # 匹配形如 "字段:值" 的片段,字段名不允许包含常见分隔符; + # 下一个字段需以顿号/逗号/分号等分隔开,且不能是 URL 协议开头,避免值里出现 URL 的":" 被误拆 + name_re = r"[A-Za-z0-9\u4e00-\u9fa5_\-&]+" + pair_pattern = re.compile( + rf"({name_re})[::](.*?)(?=(?:[,,。;;、]+\s*(?!https?://|ftp://|ftps://|magnet:){name_re}[::])|$)", + re.IGNORECASE, + ) + for line in text.splitlines(): + line = line.strip() + if not line: + continue + matches = list(pair_pattern.finditer(line)) + if matches: + book_spans = _collect_spans(line, "《", "》") + _collect_spans(line, "【", "】") + if book_spans: + has_book_colon = False + for m in matches: + colon_idx = _find_colon_index(line, m) + if colon_idx is not None and any(l < colon_idx < r for l, r in book_spans): + has_book_colon = True + break + if has_book_colon: + desc_lines.append(line) + continue + # 若整行只是 URL/时间等自然包含":"的内容,则不当作字段 + url_like_names = {"http", "https", "ftp", "ftps", "magnet"} + if all(m.group(1).lower() in url_like_names or m.group(1).isdigit() for m in matches): desc_lines.append(line) continue - # 若整行只是 URL/时间等自然包含":"的内容,则不当作字段 - url_like_names = {"http", "https", "ftp", "ftps", "magnet"} - if all(m.group(1).lower() in url_like_names or m.group(1).isdigit() for m in matches): + last_end = 0 + for m in matches: + # 追加匹配前的非空文本到描述 + prefix = line[last_end:m.start()].strip(" ,,;;。、") + # 仅当前缀不全是分隔符/空白时才记录 + if prefix and prefix.strip(" ,,;;。、"): + desc_lines.append(prefix) + name = m.group(1).strip() + value = m.group(2).strip(" ,,;;。、\t") or "-" + if name: + fields.append({"name": name, "value": value, "inline": False}) + last_end = m.end() + # 匹配末尾后的文本 + suffix = line[last_end:].strip(" ,,;;。、") + if suffix and suffix.strip(" ,,;;。、"): + desc_lines.append(suffix) + else: desc_lines.append(line) - continue - last_end = 0 - for m in matches: - # 追加匹配前的非空文本到描述 - prefix = line[last_end:m.start()].strip(" ,,;;。、") - # 仅当前缀不全是分隔符/空白时才记录 - if prefix and prefix.strip(" ,,;;。、"): - desc_lines.append(prefix) - name = m.group(1).strip() - value = m.group(2).strip(" ,,;;。、\t") or "-" - if name: - fields.append({"name": name, "value": value, "inline": False}) - last_end = m.end() - # 匹配末尾后的文本 - suffix = line[last_end:].strip(" ,,;;。、") - if suffix and suffix.strip(" ,,;;。、"): - desc_lines.append(suffix) - else: - desc_lines.append(line) description = "\n".join(desc_lines).strip() if not description and not fields and text: description = text.strip()