diff --git a/app/helper/rss.py b/app/helper/rss.py index a40bea86..d7633a25 100644 --- a/app/helper/rss.py +++ b/app/helper/rss.py @@ -3,7 +3,6 @@ import traceback from typing import List, Tuple, Union, Optional from urllib.parse import urljoin -import chardet from lxml import etree from app.core.config import settings @@ -228,21 +227,6 @@ class RssHelper: }, } - def __decode_fast_text(self, raw_data: bytes, ret) -> Optional[str]: - """ - 使用响应声明编码或 UTF-8 快速解码,优先服务 Rust 解析快路径。 - """ - seen_encodings = set() - for encoding in (getattr(ret, "encoding", None), "utf-8"): - if not encoding or encoding in seen_encodings: - continue - seen_encodings.add(encoding) - try: - return raw_data.decode(encoding) - except UnicodeDecodeError: - continue - return None - def __parse_with_rust(self, ret_xml: Optional[str]) -> Optional[list]: """ 调用 Rust RSS 解析器,并统一处理基础 XML 校验和最大条目限制。 @@ -301,26 +285,14 @@ class RssHelper: return False if raw_data: - ret_xml = self.__decode_fast_text(raw_data, ret) + ret_xml = RequestUtils.get_decoded_xml_content( + ret, + performance_mode=settings.ENCODING_DETECTION_PERFORMANCE_MODE, + confidence_threshold=settings.ENCODING_DETECTION_MIN_CONFIDENCE + ) rust_items = self.__parse_with_rust(ret_xml) if rust_items is not None: return rust_items - if not ret_xml: - try: - result = chardet.detect(raw_data) - encoding = result['encoding'] - # 解码为字符串 - ret_xml = raw_data.decode(encoding) - except Exception as e: - logger.debug(f"chardet解码失败:{str(e)}") - # 探测utf-8解码 - match = re.search(r'encoding\s*=\s*["\']([^"\']+)["\']', ret.text) - if match: - encoding = match.group(1) - if encoding: - ret_xml = raw_data.decode(encoding) - else: - ret.encoding = ret.apparent_encoding if not ret_xml: ret_xml = ret.text diff --git a/app/utils/http.py b/app/utils/http.py index 3556c99a..dbf8ddf2 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -769,6 +769,112 @@ class RequestUtils: logger.debug(f"Error when detect_encoding_from_response: {str(e)}") return fallback_encoding or "utf-8" + @staticmethod + def detect_xml_declared_encoding(raw_data: bytes) -> Optional[str]: + """ + 从 XML 声明中读取字符集,适用于 RSS/Atom 等 XML 响应的 bytes 级解码。 + """ + if not raw_data: + return None + xml_head = raw_data[:512].decode("ascii", errors="ignore") + match = re.search( + r"^\s*(?:\ufeff)?<\?xml[^>]*encoding\s*=\s*[\"']([^\"']+)[\"']", + xml_head, + re.IGNORECASE, + ) + return match.group(1).strip() if match else None + + @staticmethod + def is_low_confidence_http_encoding(encoding: Optional[str]) -> bool: + """ + 判断 HTTP 客户端默认编码是否低可信,避免 latin1 类默认值吞掉 UTF-8 内容。 + """ + if not encoding: + return False + normalized = encoding.strip().lower().replace("_", "-") + return normalized in {"iso-8859-1", "latin-1", "latin1"} + + @staticmethod + def get_decoded_xml_content( + response: Response, + performance_mode: bool = False, + confidence_threshold: float = 0.8, + ) -> str: + """ + 获取 XML 响应的解码文本内容,优先尊重 XML 声明并避免低可信 HTTP 默认编码。 + + :param response: HTTP 响应对象 + :param performance_mode: 是否优先使用轻量规则,默认为 False (兼容模式) + :param confidence_threshold: chardet 检测置信度阈值,默认为 0.8 + :return: 解码后的 XML 文本 + """ + if not response: + return "" + raw_data = getattr(response, "content", None) + if not raw_data: + return getattr(response, "text", "") or "" + + def _try_decode(encodings): + seen_encodings = set() + for encoding in encodings: + if not encoding: + continue + normalized = str(encoding).strip() + if not normalized or normalized.lower() in seen_encodings: + continue + seen_encodings.add(normalized.lower()) + try: + return raw_data.decode(normalized) + except (LookupError, UnicodeDecodeError): + continue + return None + + xml_encoding = RequestUtils.detect_xml_declared_encoding(raw_data) + if xml_encoding: + decoded = _try_decode([xml_encoding]) + if decoded is not None: + return decoded + + response_encoding = getattr(response, "encoding", None) + trusted_response_encoding = ( + response_encoding + if not RequestUtils.is_low_confidence_http_encoding(response_encoding) + else None + ) + apparent_encoding = getattr(response, "apparent_encoding", None) + trusted_apparent_encoding = ( + apparent_encoding + if not RequestUtils.is_low_confidence_http_encoding(apparent_encoding) + else None + ) + + fallback_encoding = None + try: + if performance_mode: + decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding]) + if decoded is not None: + return decoded + + detection = chardet.detect(raw_data) + if detection.get("confidence", 0) > confidence_threshold: + decoded = _try_decode([detection.get("encoding")]) + if decoded is not None: + return decoded + fallback_encoding = detection.get("encoding") + + if not performance_mode: + decoded = _try_decode(["utf-8", trusted_response_encoding, trusted_apparent_encoding]) + if decoded is not None: + return decoded + + decoded = _try_decode([fallback_encoding, "utf-8", apparent_encoding, response_encoding]) + if decoded is not None: + return decoded + except Exception as e: + logger.debug(f"Error when getting decoded XML content: {str(e)}") + + return raw_data.decode("utf-8", errors="replace") + @staticmethod def get_decoded_html_content( response: Response, diff --git a/tests/test_http_encoding.py b/tests/test_http_encoding.py new file mode 100644 index 00000000..a2e68eea --- /dev/null +++ b/tests/test_http_encoding.py @@ -0,0 +1,65 @@ +from types import SimpleNamespace + +from app.utils.http import RequestUtils + + +def test_xml_decoding_prefers_xml_declaration_over_http_default(): + """ + XML 声明应优先于 HTTP 默认编码,避免 UTF-8 RSS 标题被 latin1 类编码解坏。 + """ + xml = '警察故事4:简单任务' + response = SimpleNamespace( + content=xml.encode("utf-8"), + encoding="ISO-8859-1", + apparent_encoding="utf-8", + text=xml.encode("utf-8").decode("ISO-8859-1"), + ) + + decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True) + + assert "警察故事4:简单任务" in decoded + assert "è­¦" not in decoded + + +def test_xml_decoding_uses_declared_non_utf8_encoding(): + """ + XML 声明为非 UTF-8 时应按声明解码,兼容旧站点的 GBK/Big5 类响应。 + """ + xml = '中文标题' + response = SimpleNamespace( + content=xml.encode("gbk"), + encoding="ISO-8859-1", + apparent_encoding="ISO-8859-1", + text=xml.encode("gbk").decode("ISO-8859-1"), + ) + + decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True) + + assert "中文标题" in decoded + + +def test_xml_decoding_skips_low_confidence_apparent_encoding(): + """ + apparent_encoding 为 latin1 类编码时不应抢先解码,避免无 XML 声明的中文 RSS 被吞成乱码。 + """ + xml = "中文标题" + response = SimpleNamespace( + content=xml.encode("gbk"), + encoding="ISO-8859-1", + apparent_encoding="ISO-8859-1", + text=xml.encode("gbk").decode("ISO-8859-1"), + ) + + decoded = RequestUtils.get_decoded_xml_content(response, performance_mode=True) + + assert "中文标题" in decoded + assert "ÖÐÎÄ" not in decoded + + +def test_latin1_http_encoding_is_low_confidence(): + """ + latin1 类编码常由 HTTP 客户端默认填充,不能作为 XML/RSS 解码的优先依据。 + """ + assert RequestUtils.is_low_confidence_http_encoding("ISO-8859-1") + assert RequestUtils.is_low_confidence_http_encoding("latin-1") + assert not RequestUtils.is_low_confidence_http_encoding("utf-8") diff --git a/tests/test_rss_helper.py b/tests/test_rss_helper.py new file mode 100644 index 00000000..8f9e97ed --- /dev/null +++ b/tests/test_rss_helper.py @@ -0,0 +1,56 @@ +from types import SimpleNamespace + +from app.helper import rss as rss_module +from app.helper.rss import RssHelper +from app.utils.http import RequestUtils + + +def test_rss_helper_decodes_utf8_xml_before_python_parser(monkeypatch): + """ + RSS 解码应先修正 XML 文本,再交给 Python 解析兜底路径处理。 + """ + xml = """ + + + + + <![CDATA[警察故事4:简单任务 2160p]]> + + https://example.com/details/4 + 2026-06-25T10:30:00Z + + + + """.strip() + + class FakeRequestUtils: + """ + 测试用 RequestUtils,避免真实网络请求。 + """ + + get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content) + + def __init__(self, **_kwargs): + """ + 保存构造参数占位,兼容 RssHelper 的调用方式。 + """ + + def get_res(self, _url): + """ + 返回带错误 HTTP 默认编码的 RSS 响应对象。 + """ + return SimpleNamespace( + status_code=200, + content=xml.encode("utf-8"), + text=xml.encode("utf-8").decode("ISO-8859-1"), + apparent_encoding="utf-8", + encoding="ISO-8859-1", + ) + + monkeypatch.setattr(rss_module, "RequestUtils", FakeRequestUtils) + monkeypatch.setattr(rss_module.rust_accel, "parse_rss_items", lambda *_args, **_kwargs: None) + + result = RssHelper().parse("https://example.com/rss") + + assert result[0]["title"] == "警察故事4:简单任务 2160p" + assert result[0]["description"] == "中文简介" diff --git a/tests/test_rust_accel.py b/tests/test_rust_accel.py index d1e4c804..9669ec54 100644 --- a/tests/test_rust_accel.py +++ b/tests/test_rust_accel.py @@ -17,6 +17,7 @@ from app.modules.indexer.spider import SiteSpider from app.schemas.types import SystemConfigKey from app.schemas.types import MediaType from app.utils import rust_accel +from app.utils.http import RequestUtils pytestmark = pytest.mark.skipif( @@ -153,6 +154,8 @@ def test_rss_helper_parse_uses_rust_parser(monkeypatch): 测试用 RequestUtils,避免真实网络请求。 """ + get_decoded_xml_content = staticmethod(RequestUtils.get_decoded_xml_content) + def __init__(self, **_kwargs): """ 保存构造参数占位,兼容 RssHelper 的调用方式。