From 52c5f2900f336f1bc7051de8bd11b63ce448fd40 Mon Sep 17 00:00:00 2001
From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com>
Date: Fri, 26 Jun 2026 06:37:18 +0800
Subject: [PATCH] fix(rss): decode XML feeds before parsing (#6004)
---
app/helper/rss.py | 38 ++-----------
app/utils/http.py | 106 ++++++++++++++++++++++++++++++++++++
tests/test_http_encoding.py | 65 ++++++++++++++++++++++
tests/test_rss_helper.py | 56 +++++++++++++++++++
tests/test_rust_accel.py | 3 +
5 files changed, 235 insertions(+), 33 deletions(-)
create mode 100644 tests/test_http_encoding.py
create mode 100644 tests/test_rss_helper.py
diff --git a/app/helper/rss.py b/app/helper/rss.py
index a40bea86..d7633a25 100644
--- a/app/helper/rss.py
+++ b/app/helper/rss.py
@@ -3,7 +3,6 @@ import traceback
from typing import List, Tuple, Union, Optional
from urllib.parse import urljoin
-import chardet
from lxml import etree
from app.core.config import settings
@@ -228,21 +227,6 @@ class RssHelper:
},
}
- def __decode_fast_text(self, raw_data: bytes, ret) -> Optional[str]:
- """
- 使用响应声明编码或 UTF-8 快速解码,优先服务 Rust 解析快路径。
- """
- seen_encodings = set()
- for encoding in (getattr(ret, "encoding", None), "utf-8"):
- if not encoding or encoding in seen_encodings:
- continue
- seen_encodings.add(encoding)
- try:
- return raw_data.decode(encoding)
- except UnicodeDecodeError:
- continue
- return None
-
def __parse_with_rust(self, ret_xml: Optional[str]) -> Optional[list]:
"""
调用 Rust RSS 解析器,并统一处理基础 XML 校验和最大条目限制。
@@ -301,26 +285,14 @@ class RssHelper:
return False
if raw_data:
- ret_xml = self.__decode_fast_text(raw_data, ret)
+ ret_xml = RequestUtils.get_decoded_xml_content(
+ ret,
+ performance_mode=settings.ENCODING_DETECTION_PERFORMANCE_MODE,
+ confidence_threshold=settings.ENCODING_DETECTION_MIN_CONFIDENCE
+ )
rust_items = self.__parse_with_rust(ret_xml)
if rust_items is not None:
return rust_items
- if not ret_xml:
- try:
- result = chardet.detect(raw_data)
- encoding = result['encoding']
- # 解码为字符串
- ret_xml = raw_data.decode(encoding)
- except Exception as e:
- logger.debug(f"chardet解码失败:{str(e)}")
- # 探测utf-8解码
- match = re.search(r'encoding\s*=\s*["\']([^"\']+)["\']', ret.text)
- if match:
- encoding = match.group(1)
- if encoding:
- ret_xml = raw_data.decode(encoding)
- else:
- ret.encoding = ret.apparent_encoding
if not ret_xml:
ret_xml = ret.text
diff --git a/app/utils/http.py b/app/utils/http.py
index 3556c99a..dbf8ddf2 100644
--- a/app/utils/http.py
+++ b/app/utils/http.py
@@ -769,6 +769,112 @@ class RequestUtils:
logger.debug(f"Error when detect_encoding_from_response: {str(e)}")
return fallback_encoding or "utf-8"
+ @staticmethod
+ def detect_xml_declared_encoding(raw_data: bytes) -> Optional[str]:
+ """
+ 从 XML 声明中读取字符集,适用于 RSS/Atom 等 XML 响应的 bytes 级解码。
+ """
+ if not raw_data:
+ return None
+ xml_head = raw_data[:512].decode("ascii", errors="ignore")
+ match = re.search(
+ r"^\s*(?:\ufeff)?<\?xml[^>]*encoding\s*=\s*[\"']([^\"']+)[\"']",
+ xml_head,
+ re.IGNORECASE,
+ )
+ return match.group(1).strip() if match else None
+
+ @staticmethod
+ def is_low_confidence_http_encoding(encoding: Optional[str]) -> bool:
+ """
+ 判断 HTTP 客户端默认编码是否低可信,避免 latin1 类默认值吞掉 UTF-8 内容。
+ """
+ if not encoding:
+ return False
+ normalized = encoding.strip().lower().replace("_", "-")
+ return normalized in {"iso-8859-1", "latin-1", "latin1"}
+
+ @staticmethod
+ def get_decoded_xml_content(
+ response: Response,
+ performance_mode: bool = False,
+ confidence_threshold: float = 0.8,
+ ) -> str:
+ """
+ 获取 XML 响应的解码文本内容,优先尊重 XML 声明并避免低可信 HTTP 默认编码。
+
+ :param response: HTTP 响应对象
+ :param performance_mode: 是否优先使用轻量规则,默认为 False (兼容模式)
+ :param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
+ :return: 解码后的 XML 文本
+ """
+ if not response:
+ return ""
+ raw_data = getattr(response, "content", None)
+ if not raw_data:
+ return getattr(response, "text", "") or ""
+
+ def _try_decode(encodings):
+ seen_encodings = set()
+ for encoding in encodings:
+ if not encoding:
+ continue
+ normalized = str(encoding).strip()
+ if not normalized or normalized.lower() in seen_encodings:
+ continue
+ seen_encodings.add(normalized.lower())
+ try:
+ return raw_data.decode(normalized)
+ except (LookupError, UnicodeDecodeError):
+ continue
+ return None
+
+ xml_encoding = RequestUtils.detect_xml_declared_encoding(raw_data)
+ if xml_encoding:
+ decoded = _try_decode([xml_encoding])
+ if decoded is not None:
+ return decoded
+
+ response_encoding = getattr(response, "encoding", None)
+ trusted_response_encoding = (
+ response_encoding
+ if not RequestUtils.is_low_confidence_http_encoding(response_encoding)
+ else None
+ )
+ apparent_encoding = getattr(response, "apparent_encoding", None)
+ trusted_apparent_encoding = (
+ apparent_encoding
+ if not RequestUtils.is_low_confidence_http_encoding(apparent_encoding)
+ else None
+ )
+
+ fallback_encoding = None
+ try:
+ if performance_mode:
+ decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding])
+ if decoded is not None:
+ return decoded
+
+ detection = chardet.detect(raw_data)
+ if detection.get("confidence", 0) > confidence_threshold:
+ decoded = _try_decode([detection.get("encoding")])
+ if decoded is not None:
+ return decoded
+ fallback_encoding = detection.get("encoding")
+
+ if not performance_mode:
+ decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding])
+ if decoded is not None:
+ return decoded
+
+ decoded = _try_decode([fallback_encoding, "utf-8", apparent_encoding, response_encoding])
+ if decoded is not None:
+ return decoded
+ except Exception as e:
+ logger.debug(f"Error when getting decoded XML content: {str(e)}")
+
+ return raw_data.decode("utf-8", errors="replace")
+
@staticmethod
def get_decoded_html_content(
response: Response,
diff --git a/tests/test_http_encoding.py b/tests/test_http_encoding.py
new file mode 100644
index 00000000..a2e68eea
--- /dev/null
+++ b/tests/test_http_encoding.py
@@ -0,0 +1,65 @@
+from types import SimpleNamespace
+
+from app.utils.http import RequestUtils
+
+
+def test_xml_decoding_prefers_xml_declaration_over_http_default():
+ """
+ XML 声明应优先于 HTTP 默认编码,避免 UTF-8 RSS 标题被 latin1 类编码解坏。
+ """
+ xml = '警察故事4:简单任务'
+ response = SimpleNamespace(
+ content=xml.encode("utf-8"),
+ encoding="ISO-8859-1",
+ apparent_encoding="utf-8",
+ text=xml.encode("utf-8").decode("ISO-8859-1"),
+ )
+
+ decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
+
+ assert "警察故事4:简单任务" in decoded
+ assert "è¦" not in decoded
+
+
+def test_xml_decoding_uses_declared_non_utf8_encoding():
+ """
+ XML 声明为非 UTF-8 时应按声明解码,兼容旧站点的 GBK/Big5 类响应。
+ """
+ xml = '中文标题'
+ response = SimpleNamespace(
+ content=xml.encode("gbk"),
+ encoding="ISO-8859-1",
+ apparent_encoding="ISO-8859-1",
+ text=xml.encode("gbk").decode("ISO-8859-1"),
+ )
+
+ decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
+
+ assert "中文标题" in decoded
+
+
+def test_xml_decoding_skips_low_confidence_apparent_encoding():
+ """
+ apparent_encoding 为 latin1 类编码时不应抢先解码,避免无 XML 声明的中文 RSS 被吞成乱码。
+ """
+ xml = "中文标题"
+ response = SimpleNamespace(
+ content=xml.encode("gbk"),
+ encoding="ISO-8859-1",
+ apparent_encoding="ISO-8859-1",
+ text=xml.encode("gbk").decode("ISO-8859-1"),
+ )
+
+ decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
+
+ assert "中文标题" in decoded
+ assert "ÖÐÎÄ" not in decoded
+
+
+def test_latin1_http_encoding_is_low_confidence():
+ """
+ latin1 类编码常由 HTTP 客户端默认填充,不能作为 XML/RSS 解码的优先依据。
+ """
+ assert RequestUtils.is_low_confidence_http_encoding("ISO-8859-1")
+ assert RequestUtils.is_low_confidence_http_encoding("latin-1")
+ assert not RequestUtils.is_low_confidence_http_encoding("utf-8")
diff --git a/tests/test_rss_helper.py b/tests/test_rss_helper.py
new file mode 100644
index 00000000..8f9e97ed
--- /dev/null
+++ b/tests/test_rss_helper.py
@@ -0,0 +1,56 @@
+from types import SimpleNamespace
+
+from app.helper import rss as rss_module
+from app.helper.rss import RssHelper
+from app.utils.http import RequestUtils
+
+
+def test_rss_helper_decodes_utf8_xml_before_python_parser(monkeypatch):
+ """
+ RSS 解码应先修正 XML 文本,再交给 Python 解析兜底路径处理。
+ """
+ xml = """
+
+
+
+ -
+
+
+ https://example.com/details/4
+ 2026-06-25T10:30:00Z
+
+
+
+ """.strip()
+
+ class FakeRequestUtils:
+ """
+ 测试用 RequestUtils,避免真实网络请求。
+ """
+
+ get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content)
+
+ def __init__(self, **_kwargs):
+ """
+ 保存构造参数占位,兼容 RssHelper 的调用方式。
+ """
+
+ def get_res(self, _url):
+ """
+ 返回带错误 HTTP 默认编码的 RSS 响应对象。
+ """
+ return SimpleNamespace(
+ status_code=200,
+ content=xml.encode("utf-8"),
+ text=xml.encode("utf-8").decode("ISO-8859-1"),
+ apparent_encoding="utf-8",
+ encoding="ISO-8859-1",
+ )
+
+ monkeypatch.setattr(rss_module, "RequestUtils", FakeRequestUtils)
+ monkeypatch.setattr(rss_module.rust_accel, "parse_rss_items", lambda *_args, **_kwargs: None)
+
+ result = RssHelper().parse("https://example.com/rss")
+
+ assert result[0]["title"] == "警察故事4:简单任务 2160p"
+ assert result[0]["description"] == "中文简介"
diff --git a/tests/test_rust_accel.py b/tests/test_rust_accel.py
index d1e4c804..9669ec54 100644
--- a/tests/test_rust_accel.py
+++ b/tests/test_rust_accel.py
@@ -17,6 +17,7 @@ from app.modules.indexer.spider import SiteSpider
from app.schemas.types import SystemConfigKey
from app.schemas.types import MediaType
from app.utils import rust_accel
+from app.utils.http import RequestUtils
pytestmark = pytest.mark.skipif(
@@ -153,6 +154,8 @@ def test_rss_helper_parse_uses_rust_parser(monkeypatch):
测试用 RequestUtils,避免真实网络请求。
"""
+ get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content)
+
def __init__(self, **_kwargs):
"""
保存构造参数占位,兼容 RssHelper 的调用方式。