import re
from typing import Any, Optional, Union
import chardet
import requests
import urllib3
from requests import Response, Session
from urllib3.exceptions import InsecureRequestWarning
from app.log import logger
urllib3.disable_warnings(InsecureRequestWarning)
class RequestUtils:
_headers: dict = None
_cookies: Union[str, dict] = None
_proxies: dict = None
_timeout: int = 20
_session: Session = None
def __init__(self,
headers: dict = None,
ua: str = None,
cookies: Union[str, dict] = None,
proxies: dict = None,
session: Session = None,
timeout: int = None,
referer: str = None,
content_type: str = None,
accept_type: str = None):
if not content_type:
content_type = "application/x-www-form-urlencoded; charset=UTF-8"
if headers:
self._headers = headers
else:
self._headers = {
"User-Agent": ua,
"Content-Type": content_type,
"Accept": accept_type,
"referer": referer
}
if cookies:
if isinstance(cookies, str):
self._cookies = self.cookie_parse(cookies)
else:
self._cookies = cookies
if proxies:
self._proxies = proxies
if session:
self._session = session
if timeout:
self._timeout = timeout
def request(self, method: str, url: str, raise_exception: bool = False, **kwargs) -> Optional[Response]:
"""
发起HTTP请求
:param method: HTTP方法,如 get, post, put 等
:param url: 请求的URL
:param raise_exception: 是否在发生异常时抛出异常,否则默认拦截异常返回None
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象
:raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出
"""
if self._session is None:
req_method = requests.request
else:
req_method = self._session.request
kwargs.setdefault("headers", self._headers)
kwargs.setdefault("cookies", self._cookies)
kwargs.setdefault("proxies", self._proxies)
kwargs.setdefault("timeout", self._timeout)
kwargs.setdefault("verify", False)
kwargs.setdefault("stream", False)
try:
return req_method(method, url, **kwargs)
except requests.exceptions.RequestException as e:
logger.debug(f"请求失败: {e}")
if raise_exception:
raise
return None
def get(self, url: str, params: dict = None, **kwargs) -> Optional[str]:
"""
发送GET请求
:param url: 请求的URL
:param params: 请求的参数
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: 响应的内容,若发生RequestException则返回None
"""
response = self.request(method="get", url=url, params=params, **kwargs)
return str(response.content, "utf-8") if response else None
def post(self, url: str, data: Any = None, json: dict = None, **kwargs) -> Optional[Response]:
"""
发送POST请求
:param url: 请求的URL
:param data: 请求的数据
:param json: 请求的JSON数据
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象,若发生RequestException则返回None
"""
if json is None:
json = {}
return self.request(method="post", url=url, data=data, json=json, **kwargs)
def put(self, url: str, data: Any = None, **kwargs) -> Optional[Response]:
"""
发送PUT请求
:param url: 请求的URL
:param data: 请求的数据
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象,若发生RequestException则返回None
"""
return self.request(method="put", url=url, data=data, **kwargs)
def get_res(self,
url: str,
params: dict = None,
data: Any = None,
json: dict = None,
allow_redirects: bool = True,
raise_exception: bool = False,
**kwargs) -> Optional[Response]:
"""
发送GET请求并返回响应对象
:param url: 请求的URL
:param params: 请求的参数
:param data: 请求的数据
:param json: 请求的JSON数据
:param allow_redirects: 是否允许重定向
:param raise_exception: 是否在发生异常时抛出异常,否则默认拦截异常返回None
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象,若发生RequestException则返回None
:raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出
"""
return self.request(method="get",
url=url,
params=params,
data=data,
json=json,
allow_redirects=allow_redirects,
raise_exception=raise_exception,
**kwargs)
def post_res(self,
url: str,
data: Any = None,
params: dict = None,
allow_redirects: bool = True,
files: Any = None,
json: dict = None,
raise_exception: bool = False,
**kwargs) -> Optional[Response]:
"""
发送POST请求并返回响应对象
:param url: 请求的URL
:param data: 请求的数据
:param params: 请求的参数
:param allow_redirects: 是否允许重定向
:param files: 请求的文件
:param json: 请求的JSON数据
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:param raise_exception: 是否在发生异常时抛出异常,否则默认拦截异常返回None
:return: HTTP响应对象,若发生RequestException则返回None
:raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出
"""
return self.request(method="post",
url=url,
data=data,
params=params,
allow_redirects=allow_redirects,
files=files,
json=json,
raise_exception=raise_exception,
**kwargs)
def put_res(self,
url: str,
data: Any = None,
params: dict = None,
allow_redirects: bool = True,
files: Any = None,
json: dict = None,
raise_exception: bool = False,
**kwargs) -> Optional[Response]:
"""
发送PUT请求并返回响应对象
:param url: 请求的URL
:param data: 请求的数据
:param params: 请求的参数
:param allow_redirects: 是否允许重定向
:param files: 请求的文件
:param json: 请求的JSON数据
:param raise_exception: 是否在发生异常时抛出异常,否则默认拦截异常返回None
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象,若发生RequestException则返回None
:raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出
"""
return self.request(method="put",
url=url,
data=data,
params=params,
allow_redirects=allow_redirects,
files=files,
json=json,
raise_exception=raise_exception,
**kwargs)
def delete_res(self,
url: str,
data: Any = None,
params: dict = None,
allow_redirects: bool = True,
raise_exception: bool = False,
**kwargs) -> Optional[Response]:
"""
发送DELETE请求并返回响应对象
:param url: 请求的URL
:param data: 请求的数据
:param params: 请求的参数
:param allow_redirects: 是否允许重定向
:param raise_exception: 是否在发生异常时抛出异常,否则默认拦截异常返回None
:param kwargs: 其他请求参数,如headers, cookies, proxies等
:return: HTTP响应对象,若发生RequestException则返回None
:raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出
"""
return self.request(method="delete",
url=url,
data=data,
params=params,
allow_redirects=allow_redirects,
raise_exception=raise_exception,
**kwargs)
@staticmethod
def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]:
"""
解析cookie,转化为字典或者数组
:param cookies_str: cookie字符串
:param array: 是否转化为数组
:return: 字典或者数组
"""
if not cookies_str:
return {}
cookie_dict = {}
cookies = cookies_str.split(";")
for cookie in cookies:
cstr = cookie.split("=")
if len(cstr) > 1:
cookie_dict[cstr[0].strip()] = cstr[1].strip()
if array:
return [{"name": k, "value": v} for k, v in cookie_dict.items()]
return cookie_dict
@staticmethod
def parse_cache_control(header: str) -> (str, int):
"""
解析 Cache-Control 头,返回 cache_directive 和 max_age
:param header: Cache-Control 头部的字符串
:return: cache_directive 和 max_age
"""
cache_directive = ""
max_age = None
if not header:
return cache_directive, max_age
directives = [directive.strip() for directive in header.split(",")]
for directive in directives:
if directive.startswith("max-age"):
try:
max_age = int(directive.split("=")[1])
except Exception as e:
logger.debug(f"Invalid max-age directive in Cache-Control header: {directive}, {e}")
elif directive in {"no-cache", "private", "public", "no-store", "must-revalidate"}:
cache_directive = directive
return cache_directive, max_age
@staticmethod
def generate_cache_headers(etag: Optional[str], cache_control: Optional[str] = "public",
max_age: Optional[int] = 86400) -> dict:
"""
生成 HTTP 响应的 ETag 和 Cache-Control 头
:param etag: 响应的 ETag 值。如果为 None,则不添加 ETag 头部。
:param cache_control: Cache-Control 指令,例如 "public"、"private" 等。默认为 "public"
:param max_age: Cache-Control 的 max-age 值(秒)。默认为 86400 秒(1天)
:return: HTTP 头部的字典
"""
cache_headers = {}
if etag:
cache_headers["ETag"] = etag
if cache_control and max_age is not None:
cache_headers["Cache-Control"] = f"{cache_control}, max-age={max_age}"
elif cache_control:
cache_headers["Cache-Control"] = cache_control
elif max_age is not None:
cache_headers["Cache-Control"] = f"max-age={max_age}"
return cache_headers
@staticmethod
def detect_encoding_from_html_response(response: Response,
performance_mode: bool = False, confidence_threshold: float = 0.8):
"""
根据HTML响应内容探测编码信息
:param response: HTTP 响应对象
:param performance_mode: 是否使用性能模式,默认为 False (兼容模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解析得到的字符编码
"""
fallback_encoding = None
try:
if not performance_mode:
# 兼容模式:使用chardet分析后,再处理 BOM 和 meta 信息
# 1. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection["confidence"] > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM)
if response.content[:3] == b"\xef\xbb\xbf": # UTF-8 BOM
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 4. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
else:
# 性能模式:优先从 headers 和 BOM 标记获取,最后使用 chardet 分析
# 1. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集,仅提取UTF8
# match = re.search(r"charset=[\"']?([^\"';\s]+)", content_type, re.IGNORECASE)
# if match:
# return match.group(1)
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM)
if response.content[:3] == b"\xef\xbb\xbf":
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集,仅提取UTF8
# match = re.search(r"]+charset=[\"']?([^\"'>\s]+)", response.text, re.IGNORECASE)
# if match:
# return match.group(1)
# 4. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection.get("confidence", 0) > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 5. 如果上述方法都无法确定,信任 chardet 的结果(即使置信度较低),否则返回默认字符集
return fallback_encoding or "utf-8"
except Exception as e:
logger.debug(f"Error when detect_encoding_from_response: {str(e)}")
return fallback_encoding or "utf-8"
@staticmethod
def get_decoded_html_content(response: Response,
performance_mode: bool = False, confidence_threshold: float = 0.8) -> str:
"""
获取HTML响应的解码文本内容
:param response: HTTP 响应对象
:param performance_mode: 是否使用性能模式,默认为 False (兼容模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解码后的响应文本内容
"""
try:
if not response:
return ""
if response.content:
# 1. 获取编码信息
encoding = (RequestUtils.detect_encoding_from_html_response(response, performance_mode,
confidence_threshold)
or response.apparent_encoding)
# 2. 根据解析得到的编码进行解码
try:
# 尝试用推测的编码解码
return response.content.decode(encoding)
except Exception as e:
logger.debug(f"Decoding failed, error message: {str(e)}")
# 如果解码失败,尝试 fallback 使用 apparent_encoding
response.encoding = response.apparent_encoding
return response.text
else:
return response.text
except Exception as e:
logger.debug(f"Error when getting decoded content: {str(e)}")
return response.text