fix(rss): decode XML feeds before parsing (#6004)

This commit is contained in:
InfinityPacer
2026-06-26 06:37:18 +08:00
committed by GitHub
parent 43e89ebf77
commit 52c5f2900f
5 changed files with 235 additions and 33 deletions

View File

@@ -3,7 +3,6 @@ import traceback
from typing import List, Tuple, Union, Optional
from urllib.parse import urljoin
import chardet
from lxml import etree
from app.core.config import settings
@@ -228,21 +227,6 @@ class RssHelper:
},
}
def __decode_fast_text(self, raw_data: bytes, ret) -> Optional[str]:
"""
使用响应声明编码或 UTF-8 快速解码,优先服务 Rust 解析快路径。
"""
seen_encodings = set()
for encoding in (getattr(ret, "encoding", None), "utf-8"):
if not encoding or encoding in seen_encodings:
continue
seen_encodings.add(encoding)
try:
return raw_data.decode(encoding)
except UnicodeDecodeError:
continue
return None
def __parse_with_rust(self, ret_xml: Optional[str]) -> Optional[list]:
"""
调用 Rust RSS 解析器,并统一处理基础 XML 校验和最大条目限制。
@@ -301,26 +285,14 @@ class RssHelper:
return False
if raw_data:
ret_xml = self.__decode_fast_text(raw_data, ret)
ret_xml = RequestUtils.get_decoded_xml_content(
ret,
performance_mode=settings.ENCODING_DETECTION_PERFORMANCE_MODE,
confidence_threshold=settings.ENCODING_DETECTION_MIN_CONFIDENCE
)
rust_items = self.__parse_with_rust(ret_xml)
if rust_items is not None:
return rust_items
if not ret_xml:
try:
result = chardet.detect(raw_data)
encoding = result['encoding']
# 解码为字符串
ret_xml = raw_data.decode(encoding)
except Exception as e:
logger.debug(f"chardet解码失败{str(e)}")
# 探测utf-8解码
match = re.search(r'encoding\s*=\s*["\']([^"\']+)["\']', ret.text)
if match:
encoding = match.group(1)
if encoding:
ret_xml = raw_data.decode(encoding)
else:
ret.encoding = ret.apparent_encoding
if not ret_xml:
ret_xml = ret.text

View File

@@ -769,6 +769,112 @@ class RequestUtils:
logger.debug(f"Error when detect_encoding_from_response: {str(e)}")
return fallback_encoding or "utf-8"
@staticmethod
def detect_xml_declared_encoding(raw_data: bytes) -> Optional[str]:
"""
从 XML 声明中读取字符集,适用于 RSS/Atom 等 XML 响应的 bytes 级解码。
"""
if not raw_data:
return None
xml_head = raw_data[:512].decode("ascii", errors="ignore")
match = re.search(
r"^\s*(?:\ufeff)?<\?xml[^>]*encoding\s*=\s*[\"']([^\"']+)[\"']",
xml_head,
re.IGNORECASE,
)
return match.group(1).strip() if match else None
@staticmethod
def is_low_confidence_http_encoding(encoding: Optional[str]) -> bool:
"""
判断 HTTP 客户端默认编码是否低可信,避免 latin1 类默认值吞掉 UTF-8 内容。
"""
if not encoding:
return False
normalized = encoding.strip().lower().replace("_", "-")
return normalized in {"iso-8859-1", "latin-1", "latin1"}
@staticmethod
def get_decoded_xml_content(
response: Response,
performance_mode: bool = False,
confidence_threshold: float = 0.8,
) -> str:
"""
获取 XML 响应的解码文本内容,优先尊重 XML 声明并避免低可信 HTTP 默认编码。
:param response: HTTP 响应对象
:param performance_mode: 是否优先使用轻量规则,默认为 False (兼容模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解码后的 XML 文本
"""
if not response:
return ""
raw_data = getattr(response, "content", None)
if not raw_data:
return getattr(response, "text", "") or ""
def _try_decode(encodings):
seen_encodings = set()
for encoding in encodings:
if not encoding:
continue
normalized = str(encoding).strip()
if not normalized or normalized.lower() in seen_encodings:
continue
seen_encodings.add(normalized.lower())
try:
return raw_data.decode(normalized)
except (LookupError, UnicodeDecodeError):
continue
return None
xml_encoding = RequestUtils.detect_xml_declared_encoding(raw_data)
if xml_encoding:
decoded = _try_decode([xml_encoding])
if decoded is not None:
return decoded
response_encoding = getattr(response, "encoding", None)
trusted_response_encoding = (
response_encoding
if not RequestUtils.is_low_confidence_http_encoding(response_encoding)
else None
)
apparent_encoding = getattr(response, "apparent_encoding", None)
trusted_apparent_encoding = (
apparent_encoding
if not RequestUtils.is_low_confidence_http_encoding(apparent_encoding)
else None
)
fallback_encoding = None
try:
if performance_mode:
decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding])
if decoded is not None:
return decoded
detection = chardet.detect(raw_data)
if detection.get("confidence", 0) > confidence_threshold:
decoded = _try_decode([detection.get("encoding")])
if decoded is not None:
return decoded
fallback_encoding = detection.get("encoding")
if not performance_mode:
decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding])
if decoded is not None:
return decoded
decoded = _try_decode([fallback_encoding, "utf-8", apparent_encoding, response_encoding])
if decoded is not None:
return decoded
except Exception as e:
logger.debug(f"Error when getting decoded XML content: {str(e)}")
return raw_data.decode("utf-8", errors="replace")
@staticmethod
def get_decoded_html_content(
response: Response,

View File

@@ -0,0 +1,65 @@
from types import SimpleNamespace
from app.utils.http import RequestUtils
def test_xml_decoding_prefers_xml_declaration_over_http_default():
"""
XML 声明应优先于 HTTP 默认编码,避免 UTF-8 RSS 标题被 latin1 类编码解坏。
"""
xml = '<?xml version="1.0" encoding="UTF-8"?><rss><title>警察故事4简单任务</title></rss>'
response = SimpleNamespace(
content=xml.encode("utf-8"),
encoding="ISO-8859-1",
apparent_encoding="utf-8",
text=xml.encode("utf-8").decode("ISO-8859-1"),
)
decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
assert "警察故事4简单任务" in decoded
assert "è­¦" not in decoded
def test_xml_decoding_uses_declared_non_utf8_encoding():
"""
XML 声明为非 UTF-8 时应按声明解码,兼容旧站点的 GBK/Big5 类响应。
"""
xml = '<?xml version="1.0" encoding="GBK"?><rss><title>中文标题</title></rss>'
response = SimpleNamespace(
content=xml.encode("gbk"),
encoding="ISO-8859-1",
apparent_encoding="ISO-8859-1",
text=xml.encode("gbk").decode("ISO-8859-1"),
)
decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
assert "中文标题" in decoded
def test_xml_decoding_skips_low_confidence_apparent_encoding():
"""
apparent_encoding 为 latin1 类编码时不应抢先解码,避免无 XML 声明的中文 RSS 被吞成乱码。
"""
xml = "<rss><title>中文标题</title></rss>"
response = SimpleNamespace(
content=xml.encode("gbk"),
encoding="ISO-8859-1",
apparent_encoding="ISO-8859-1",
text=xml.encode("gbk").decode("ISO-8859-1"),
)
decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True)
assert "中文标题" in decoded
assert "ÖÐÎÄ" not in decoded
def test_latin1_http_encoding_is_low_confidence():
"""
latin1 类编码常由 HTTP 客户端默认填充,不能作为 XML/RSS 解码的优先依据。
"""
assert RequestUtils.is_low_confidence_http_encoding("ISO-8859-1")
assert RequestUtils.is_low_confidence_http_encoding("latin-1")
assert not RequestUtils.is_low_confidence_http_encoding("utf-8")

56
tests/test_rss_helper.py Normal file
View File

@@ -0,0 +1,56 @@
from types import SimpleNamespace
from app.helper import rss as rss_module
from app.helper.rss import RssHelper
from app.utils.http import RequestUtils
def test_rss_helper_decodes_utf8_xml_before_python_parser(monkeypatch):
"""
RSS 解码应先修正 XML 文本,再交给 Python 解析兜底路径处理。
"""
xml = """
<?xml version="1.0" encoding="UTF-8"?>
<rss>
<channel>
<item>
<title><![CDATA[警察故事4简单任务 2160p]]></title>
<description><![CDATA[中文简介]]></description>
<link>https://example.com/details/4</link>
<pubDate>2026-06-25T10:30:00Z</pubDate>
</item>
</channel>
</rss>
""".strip()
class FakeRequestUtils:
"""
测试用 RequestUtils避免真实网络请求。
"""
get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content)
def __init__(self, **_kwargs):
"""
保存构造参数占位,兼容 RssHelper 的调用方式。
"""
def get_res(self, _url):
"""
返回带错误 HTTP 默认编码的 RSS 响应对象。
"""
return SimpleNamespace(
status_code=200,
content=xml.encode("utf-8"),
text=xml.encode("utf-8").decode("ISO-8859-1"),
apparent_encoding="utf-8",
encoding="ISO-8859-1",
)
monkeypatch.setattr(rss_module, "RequestUtils", FakeRequestUtils)
monkeypatch.setattr(rss_module.rust_accel, "parse_rss_items", lambda *_args, **_kwargs: None)
result = RssHelper().parse("https://example.com/rss")
assert result[0]["title"] == "警察故事4简单任务 2160p"
assert result[0]["description"] == "中文简介"

View File

@@ -17,6 +17,7 @@ from app.modules.indexer.spider import SiteSpider
from app.schemas.types import SystemConfigKey
from app.schemas.types import MediaType
from app.utils import rust_accel
from app.utils.http import RequestUtils
pytestmark = pytest.mark.skipif(
@@ -153,6 +154,8 @@ def test_rss_helper_parse_uses_rust_parser(monkeypatch):
测试用 RequestUtils避免真实网络请求。
"""
get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content)
def __init__(self, **_kwargs):
"""
保存构造参数占位,兼容 RssHelper 的调用方式。