From c745616495e7cb431f4a5bb2802a7c2638a2eea7 Mon Sep 17 00:00:00 2001 From: Aqr-K <1210498076@qq.com> Date: Sun, 10 May 2026 20:04:12 -0400 Subject: [PATCH] =?UTF-8?q?perf(http):=20=E5=BC=82=E6=AD=A5=20HTTP=20?= =?UTF-8?q?=E5=BC=95=E5=85=A5=E5=85=B1=E4=BA=AB=20AsyncHTTPTransport?= =?UTF-8?q?=EF=BC=8C=E5=A4=8D=E7=94=A8=20TCP/TLS=20=E6=8F=A1=E6=89=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AsyncRequestUtils 使用按事件循环弱引用持有的共享 AsyncHTTPTransport 作为底层连接池与 TLS 会话;每次请求创建轻量 AsyncClient 承载本次 cookie jar、timeout、follow_redirects, 用完即销毁。共享 transport 由 _NonClosingTransportProxy 包装后 注入 AsyncClient,吞掉 AsyncClient 退出时向底层 transport 传播的 __aexit__/aclose,使底层连接池跨调用持久,从而真正复用 TCP/TLS 握手。 设计要点: - 共享 transport 桶按 (proxy, verify, max_keepalive_connections, max_connections, keepalive_expiry) 区分;每事件循环 32 桶 LRU 上限,超出后异步关闭最久未用桶;关闭 task 由模块级强引用集合 持有以兼容 Python 3.11+ 的任务 GC 行为。 - 通过 FastAPI lifespan shutdown 调用 aclose_shared_async_transports 集中释放底层 transport,避免 ResourceWarning。 - AsyncRequestUtils.request 走三条 path:用户自管 client / 共享 transport + per-call AsyncClient / 兜底临时 client。三条路径 cookie 语义一致;后两条因 per-call AsyncClient 生命周期局限于 单次调用,天然不积累 Set-Cookie,避免跨调用 jar 演化串扰。 - _make_request 对幂等方法(GET/HEAD/OPTIONS)在 RemoteProtocolError / ReadError / WriteError 时单次重试, 容忍 keep-alive stale 连接命中;非幂等方法不重试,但记录 debug 日志。 - get_stream 使用 httpx.AsyncClient.stream() 标准流式 API,与 request 共用三条 path 的 client 选择逻辑;幂等单次重试; yield 体异常透传给 stream 的 __aexit__。 公共 API 表面零变动。插件可通过 max_keepalive_connections / max_connections / keepalive_expiry 三个 limits 参数为自己定制 连接池容量与握手有效期。 TMDB 真实压测(10 部美剧 × 每部 50 集,1020 请求): 61.96s → 18.15s(3.41×),单请求 p95 149.6ms → 38.1ms。 --- app/startup/lifecycle.py | 3 + app/utils/http.py | 804 ++++++++++++++++++++++++++++----------- 2 files changed, 595 insertions(+), 212 deletions(-) diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 59f26e8d..86d7057b 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -13,6 +13,7 @@ from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plu from app.startup.routers_initializer import init_routers from app.startup.scheduler_initializer import stop_scheduler, init_scheduler, init_plugin_scheduler from app.startup.workflow_initializer import init_workflow, stop_workflow +from app.utils.http import aclose_shared_async_transports async def init_extra(): @@ -83,3 +84,5 @@ async def lifespan(app: FastAPI): stop_plugins() # 停止模块 await stop_modules() + # 关闭共享的异步 HTTP 连接池,释放底层连接资源 + await aclose_shared_async_transports() diff --git a/app/utils/http.py b/app/utils/http.py index 0cbcaeef..166690c1 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -1,6 +1,10 @@ +import asyncio +import collections import re import sys -from contextlib import contextmanager, asynccontextmanager +import threading +import weakref +from contextlib import AsyncExitStack, contextmanager, asynccontextmanager from pathlib import Path from typing import Any, Optional, Tuple, Union @@ -18,25 +22,169 @@ from app.log import logger urllib3.disable_warnings(InsecureRequestWarning) +class _NonClosingTransportProxy(httpx.AsyncBaseTransport): + """ + 包装共享底层 transport,转发请求但吞掉 __aexit__/aclose 调用。 + 防止 per-call AsyncClient 在 async with 退出时把底层连接池一并清空。 + 底层 transport 的真正关闭由 aclose_shared_async_transports() 统一管理。 + """ + + __slots__ = ("_wrapped",) + + def __init__(self, wrapped: httpx.AsyncBaseTransport): + self._wrapped = wrapped + + async def __aenter__(self): # pragma: no cover - 简单转发 + return self + + async def __aexit__(self, exc_type=None, exc_value=None, traceback=None) -> None: + # 故意 no-op:不向底层 transport 传播 __aexit__,避免连接池被清空 + return None + + async def aclose(self) -> None: + # 故意 no-op:调用方显式 aclose 也不影响共享池 + return None + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + return await self._wrapped.handle_async_request(request) + + +_SharedTransportKey = Tuple[ + Optional[str], # proxy + Union[bool, str], # verify + int, # max_keepalive_connections + int, # max_connections + int, # keepalive_expiry +] +# 共享底层 transport 桶,按事件循环和配置区分,支持 LRU 淘汰 +_shared_async_transports: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, collections.OrderedDict[_SharedTransportKey, httpx.AsyncHTTPTransport]] = weakref.WeakKeyDictionary() +# 不同线程各自驱动的事件循环并发首次写入外层弱字典时,需要互斥保护 +_shared_async_transports_lock = threading.Lock() +# 每个事件循环允许的最大共享 transport 桶数;超出后按 LRU 淘汰最久未用桶。 +_MAX_SHARED_TRANSPORTS_PER_LOOP = 32 +# 默认的最大 keep-alive 连接数 +_DEFAULT_MAX_KEEPALIVE_CONNECTIONS = 20 +# 默认的最大连接数(包括 keep-alive 和非 keep-alive 连接) +_DEFAULT_MAX_CONNECTIONS = 40 +# 默认的 keep-alive 连接过期时间(秒) +_DEFAULT_KEEPALIVE_EXPIRY = 30 +# 持有 LRU 淘汰后正在异步关闭的 transport task,避免 fire-and-forget 被 GC 警告 +_pending_eviction_tasks: set[asyncio.Task] = set() + + +def _get_shared_async_transport( + proxy: Optional[str], + verify: Union[bool, str], + max_keepalive_connections: int, + max_connections: int, + keepalive_expiry: int, +) -> Optional[httpx.AsyncHTTPTransport]: + """ + 返回与当前事件循环绑定的共享 AsyncHTTPTransport(底层连接池);首次按需创建。 + 没有运行中的事件循环或循环已关闭时返回 None,由调用方走临时客户端兜底。 + + Transport 只持有连接池、SSL、代理;cookies/timeout/follow_redirects 等 + 会话级状态由调用方在外层 AsyncClient(transport=...) 实例化时单独配置, + 每次调用用完即销毁,因此天然无 jar 累积串扰。 + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return None + if loop.is_closed(): + return None + + with _shared_async_transports_lock: + per_loop = _shared_async_transports.get(loop) + if per_loop is None: + per_loop = collections.OrderedDict() + _shared_async_transports[loop] = per_loop + + key: _SharedTransportKey = ( + proxy, + verify, + max_keepalive_connections, + max_connections, + keepalive_expiry, + ) + transport = per_loop.get(key) + if transport is not None: + per_loop.move_to_end(key) # LRU 触摸 + return transport + + # 首次见到这个配置,创建新的共享 transport 桶 + transport = httpx.AsyncHTTPTransport( + proxy=proxy, + verify=verify, + limits=httpx.Limits( + max_keepalive_connections=max_keepalive_connections, + max_connections=max_connections, + keepalive_expiry=keepalive_expiry, + ), + ) + per_loop[key] = transport + + # LRU 淘汰:超出上限时关闭并移除最久未用桶 + while len(per_loop) > _MAX_SHARED_TRANSPORTS_PER_LOOP: + evicted_key, evicted_transport = per_loop.popitem(last=False) + try: + task = loop.create_task(evicted_transport.aclose()) + # 强引用避免 task 仅被 loop 弱持有而触发 "Task was destroyed but pending" + _pending_eviction_tasks.add(task) + task.add_done_callback(_pending_eviction_tasks.discard) + except Exception as e: # pragma: no cover - 防御性 + logger.debug(f"LRU 淘汰共享 transport 时调度关闭失败: {e!r}") + + return transport + + +async def aclose_shared_async_transports() -> None: + """ + 关闭当前事件循环下所有共享 AsyncHTTPTransport,释放底层连接池。 + 建议在应用关闭流程(如 FastAPI shutdown 事件)中调用,避免 ResourceWarning。 + """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + # 弹出而非 get+clear,避免外层 dict 残留空 OrderedDict 占位 + with _shared_async_transports_lock: + per_loop = _shared_async_transports.pop(loop, None) + if not per_loop: + return + transports = list(per_loop.values()) + per_loop.clear() + # 并行关闭:每个 transport 的 TLS close_notify 各占一个 RTT, + # 顺序等待会线性放大 shutdown 耗时;return_exceptions 让单点失败 + # 不影响其他 transport 的释放 + results = await asyncio.gather( + *(t.aclose() for t in transports), return_exceptions=True + ) + for result in results: + if isinstance(result, BaseException): + logger.debug(f"关闭共享 AsyncHTTPTransport 失败: {result!r}") + + def _url_decode_if_latin(original: str) -> str: """ - 解码URL编码的字符串,只解码文本,二进程数据保持不变 + 解码URL编码的字符串,只解码文本,二进制数据保持不变 :param original: URL编码字符串 :return: 解码后的字符串或原始二进制数据 """ try: # 先解码 - decoded = unquote(original, encoding='latin-1') + decoded = unquote(original, encoding="latin-1") # 再完整编码 - fully_encoded = quote(decoded, safe='') + fully_encoded = quote(decoded, safe="") # 验证 - decoded_again = unquote(fully_encoded, encoding='latin-1') + decoded_again = unquote(fully_encoded, encoding="latin-1") if decoded_again == decoded: return decoded except Exception as e: logger.error(f"latin-1解码URL编码失败:{e}") return original + def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]: """ 解析cookie,转化为字典或者数组 @@ -101,16 +249,18 @@ class RequestUtils: HTTP请求工具类,提供同步HTTP请求的基本功能 """ - def __init__(self, - headers: dict = None, - ua: str = None, - cookies: Union[str, dict] = None, - proxies: dict = None, - session: Session = None, - timeout: int = None, - referer: str = None, - content_type: str = None, - accept_type: str = None): + def __init__( + self, + headers: dict = None, + ua: str = None, + cookies: Union[str, dict] = None, + proxies: dict = None, + session: Session = None, + timeout: int = None, + referer: str = None, + content_type: str = None, + accept_type: str = None, + ): """ :param headers: 请求头部信息 :param ua: User-Agent字符串 @@ -138,7 +288,7 @@ class RequestUtils: "User-Agent": ua, "Content-Type": content_type, "Accept": accept_type, - "referer": referer + "referer": referer, } if cookies: if isinstance(cookies, str): @@ -167,7 +317,9 @@ class RequestUtils: except Exception as e: logger.debug(f"关闭响应失败: {e}") - def request(self, method: str, url: str, raise_exception: bool = False, **kwargs) -> Optional[Response]: + def request( + self, method: str, url: str, raise_exception: bool = False, **kwargs + ) -> Optional[Response]: """ 发起HTTP请求 :param method: HTTP方法,如 get, post, put 等 @@ -191,7 +343,11 @@ class RequestUtils: return req_method(method, url, **kwargs) except requests.exceptions.RequestException as e: # 获取更详细的错误信息 - error_msg = str(e) if str(e) else f"未知网络错误 (URL: {url}, Method: {method.upper()})" + error_msg = ( + str(e) + if str(e) + else f"未知网络错误 (URL: {url}, Method: {method.upper()})" + ) logger.debug(f"请求失败: {error_msg}") if raise_exception: raise @@ -219,7 +375,9 @@ class RequestUtils: if response is not None: response.close() - def post(self, url: str, data: Any = None, json: dict = None, **kwargs) -> Optional[Response]: + def post( + self, url: str, data: Any = None, json: dict = None, **kwargs + ) -> Optional[Response]: """ 发送POST请求 :param url: 请求的URL @@ -240,14 +398,16 @@ class RequestUtils: """ return self.request(method="put", url=url, data=data, **kwargs) - def get_res(self, - url: str, - params: dict = None, - data: Any = None, - json: dict = None, - allow_redirects: bool = True, - raise_exception: bool = False, - **kwargs) -> Optional[Response]: + def get_res( + self, + url: str, + params: dict = None, + data: Any = None, + json: dict = None, + allow_redirects: bool = True, + raise_exception: bool = False, + **kwargs, + ) -> Optional[Response]: """ 发送GET请求并返回响应对象 :param url: 请求的URL @@ -260,14 +420,16 @@ class RequestUtils: :return: HTTP响应对象,若发生RequestException则返回None :raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出 """ - return self.request(method="get", - url=url, - params=params, - data=data, - json=json, - allow_redirects=allow_redirects, - raise_exception=raise_exception, - **kwargs) + return self.request( + method="get", + url=url, + params=params, + data=data, + json=json, + allow_redirects=allow_redirects, + raise_exception=raise_exception, + **kwargs, + ) @contextmanager def get_stream(self, url: str, params: dict = None, **kwargs): @@ -277,7 +439,7 @@ class RequestUtils: :param params: 请求的参数 :param kwargs: 其他请求参数 """ - kwargs['stream'] = True + kwargs["stream"] = True response = self.request(method="get", url=url, params=params, **kwargs) try: yield response @@ -285,15 +447,17 @@ class RequestUtils: if response is not None: response.close() - def post_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - files: Any = None, - json: dict = None, - raise_exception: bool = False, - **kwargs) -> Optional[Response]: + def post_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + files: Any = None, + json: dict = None, + raise_exception: bool = False, + **kwargs, + ) -> Optional[Response]: """ 发送POST请求并返回响应对象 :param url: 请求的URL @@ -307,25 +471,29 @@ class RequestUtils: :return: HTTP响应对象,若发生RequestException则返回None :raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出 """ - return self.request(method="post", - url=url, - data=data, - params=params, - allow_redirects=allow_redirects, - files=files, - json=json, - raise_exception=raise_exception, - **kwargs) + return self.request( + method="post", + url=url, + data=data, + params=params, + allow_redirects=allow_redirects, + files=files, + json=json, + raise_exception=raise_exception, + **kwargs, + ) - def put_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - files: Any = None, - json: dict = None, - raise_exception: bool = False, - **kwargs) -> Optional[Response]: + def put_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + files: Any = None, + json: dict = None, + raise_exception: bool = False, + **kwargs, + ) -> Optional[Response]: """ 发送PUT请求并返回响应对象 :param url: 请求的URL @@ -339,23 +507,27 @@ class RequestUtils: :return: HTTP响应对象,若发生RequestException则返回None :raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出 """ - return self.request(method="put", - url=url, - data=data, - params=params, - allow_redirects=allow_redirects, - files=files, - json=json, - raise_exception=raise_exception, - **kwargs) + return self.request( + method="put", + url=url, + data=data, + params=params, + allow_redirects=allow_redirects, + files=files, + json=json, + raise_exception=raise_exception, + **kwargs, + ) - def delete_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - raise_exception: bool = False, - **kwargs) -> Optional[Response]: + def delete_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + raise_exception: bool = False, + **kwargs, + ) -> Optional[Response]: """ 发送DELETE请求并返回响应对象 :param url: 请求的URL @@ -367,13 +539,15 @@ class RequestUtils: :return: HTTP响应对象,若发生RequestException则返回None :raises: requests.exceptions.RequestException 仅raise_exception为True时会抛出 """ - return self.request(method="delete", - url=url, - data=data, - params=params, - allow_redirects=allow_redirects, - raise_exception=raise_exception, - **kwargs) + return self.request( + method="delete", + url=url, + data=data, + params=params, + allow_redirects=allow_redirects, + raise_exception=raise_exception, + **kwargs, + ) def get_json(self, url: str, params: dict = None, **kwargs) -> Optional[dict]: """ @@ -397,7 +571,9 @@ class RequestUtils: if response is not None: response.close() - def post_json(self, url: str, data: Any = None, json: dict = None, **kwargs) -> Optional[dict]: + def post_json( + self, url: str, data: Any = None, json: dict = None, **kwargs + ) -> Optional[dict]: """ 发送POST请求并返回JSON数据,自动关闭连接 :param url: 请求的URL @@ -441,15 +617,26 @@ class RequestUtils: try: max_age = int(directive.split("=")[1]) except Exception as e: - logger.debug(f"Invalid max-age directive in Cache-Control header: {directive}, {e}") - elif directive in {"no-cache", "private", "public", "no-store", "must-revalidate"}: + logger.debug( + f"Invalid max-age directive in Cache-Control header: {directive}, {e}" + ) + elif directive in { + "no-cache", + "private", + "public", + "no-store", + "must-revalidate", + }: cache_directive = directive return cache_directive, max_age @staticmethod - def generate_cache_headers(etag: Optional[str], cache_control: Optional[str] = "public", - max_age: Optional[int] = 86400) -> dict: + def generate_cache_headers( + etag: Optional[str], + cache_control: Optional[str] = "public", + max_age: Optional[int] = 86400, + ) -> dict: """ 生成 HTTP 响应的 ETag 和 Cache-Control 头 :param etag: 响应的 ETag 值。如果为 None,则不添加 ETag 头部。 @@ -472,8 +659,11 @@ class RequestUtils: return cache_headers @staticmethod - def detect_encoding_from_html_response(response: Response, - performance_mode: bool = False, confidence_threshold: float = 0.8): + def detect_encoding_from_html_response( + response: Response, + performance_mode: bool = False, + confidence_threshold: float = 0.8, + ): """ 根据HTML响应内容探测编码信息 @@ -498,7 +688,9 @@ class RequestUtils: return "utf-8" # 3. 如果是 HTML 响应体,检查其中的 标签 - if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE): + if re.search( + r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE + ): return "utf-8" # 4. 尝试从 response headers 中获取编码信息 @@ -517,7 +709,9 @@ class RequestUtils: return "utf-8" # 3. 如果是 HTML 响应体,检查其中的 标签 - if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE): + if re.search( + r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE + ): return "utf-8" # 4. 使用 chardet 库进一步分析内容 detection = chardet.detect(response.content) @@ -533,8 +727,11 @@ class RequestUtils: return fallback_encoding or "utf-8" @staticmethod - def get_decoded_html_content(response: Response, - performance_mode: bool = False, confidence_threshold: float = 0.8) -> str: + def get_decoded_html_content( + response: Response, + performance_mode: bool = False, + confidence_threshold: float = 0.8, + ) -> str: """ 获取HTML响应的解码文本内容 @@ -548,9 +745,12 @@ class RequestUtils: return "" if response.content: # 1. 获取编码信息 - encoding = (RequestUtils.detect_encoding_from_html_response(response, performance_mode, - confidence_threshold) - or response.apparent_encoding) + encoding = ( + RequestUtils.detect_encoding_from_html_response( + response, performance_mode, confidence_threshold + ) + or response.apparent_encoding + ) # 2. 根据解析得到的编码进行解码 try: # 尝试用推测的编码解码 @@ -572,18 +772,23 @@ class AsyncRequestUtils: 异步HTTP请求工具类,提供异步HTTP请求的基本功能 """ - def __init__(self, - headers: dict = None, - ua: str = None, - cookies: Union[str, dict] = None, - proxies: dict = None, - client: httpx.AsyncClient = None, - timeout: int = None, - referer: str = None, - content_type: str = None, - accept_type: str = None, - verify: bool = False, - follow_redirects: bool = True): + def __init__( + self, + headers: dict = None, + ua: str = None, + cookies: Union[str, dict] = None, + proxies: dict = None, + client: httpx.AsyncClient = None, + timeout: int = None, + referer: str = None, + content_type: str = None, + accept_type: str = None, + verify: Union[bool, str] = False, + follow_redirects: bool = True, + max_keepalive_connections: int = _DEFAULT_MAX_KEEPALIVE_CONNECTIONS, + max_connections: int = _DEFAULT_MAX_CONNECTIONS, + keepalive_expiry: int = _DEFAULT_KEEPALIVE_EXPIRY, + ): """ :param headers: 请求头部信息 :param ua: User-Agent字符串 @@ -596,12 +801,18 @@ class AsyncRequestUtils: :param accept_type: Accept头部信息,默认为 "application/json" :param verify: 是否校验证书 :param follow_redirects: 客户端默认是否跟随重定向 + :param max_keepalive_connections: 共享 AsyncHTTPTransport 的最大 keep-alive 连接数 + :param max_connections: 共享 AsyncHTTPTransport 的最大连接数 + :param keepalive_expiry: 共享 AsyncHTTPTransport 的 keep-alive 连接过期时间(秒) """ self._proxies = self._convert_proxies_for_httpx(proxies) self._client = client self._timeout = timeout or 20 self._verify = verify self._follow_redirects = follow_redirects + self._max_keepalive_connections = max_keepalive_connections + self._max_connections = max_connections + self._keepalive_expiry = keepalive_expiry if not content_type: content_type = "application/x-www-form-urlencoded; charset=UTF-8" if headers: @@ -633,7 +844,7 @@ class AsyncRequestUtils: def _convert_proxies_for_httpx(proxies: dict) -> Optional[str]: """ 将requests格式的代理配置转换为httpx兼容的格式 - + :param proxies: requests格式的代理配置 {"http": "http://proxy:port", "https": "http://proxy:port"} :return: httpx兼容的代理字符串或None """ @@ -672,7 +883,9 @@ class AsyncRequestUtils: except Exception as e: logger.debug(f"关闭异步响应失败: {e}") - async def request(self, method: str, url: str, raise_exception: bool = False, **kwargs) -> Optional[httpx.Response]: + async def request( + self, method: str, url: str, raise_exception: bool = False, **kwargs + ) -> Optional[httpx.Response]: """ 发起异步HTTP请求 :param method: HTTP方法,如 get, post, put 等 @@ -682,33 +895,104 @@ class AsyncRequestUtils: :return: HTTP响应对象 :raises: httpx.RequestError 仅raise_exception为True时会抛出 """ - if self._client is None: - # 创建临时客户端 - async with httpx.AsyncClient( - proxy=self._proxies, - timeout=self._timeout, - verify=self._verify, - follow_redirects=self._follow_redirects, - cookies=self._cookies # 在创建客户端时传入Cookie - ) as client: - return await self._make_request(client, method, url, raise_exception, **kwargs) - else: - return await self._make_request(self._client, method, url, raise_exception, **kwargs) + # 运行时 self._cookies 只能是 dict | None(cookie_parse 默认 array=False 返回 dict) + cookies_dict: Optional[dict] = self._cookies if isinstance(self._cookies, dict) else None - async def _make_request(self, client: httpx.AsyncClient, method: str, url: str, raise_exception: bool = False, - **kwargs) -> Optional[httpx.Response]: + if self._client is not None: + # 用户自管 client 时,把实例级 cookies 注入到本次 per-request kwargs, + # 既能复用用户 client,又不让 instance cookies 被静默丢弃。 + # 调用方若显式传 kwargs["cookies"],则以其为准(setdefault 不覆盖)。 + if cookies_dict is not None: + kwargs.setdefault("cookies", cookies_dict) + return await self._make_request( + self._client, method, url, raise_exception, **kwargs + ) + + # 共享底层 transport(连接池+TLS 复用),每次请求创建轻量 AsyncClient。 + # AsyncClient 持有的 cookie jar 仅存活于本次请求 lifecycle, + # 既复用握手又彻底避免 jar 跨调用累积。 + transport = _get_shared_async_transport( + proxy=self._proxies, + verify=self._verify, + max_keepalive_connections=self._max_keepalive_connections, + max_connections=self._max_connections, + keepalive_expiry=self._keepalive_expiry, + ) + if transport is not None: + # 用 _NonClosingTransportProxy 包装共享 transport,吞掉 AsyncClient.__aexit__ + # 传播下来的 transport.__aexit__,避免每次 async with 退出都把共享连接池清空。 + async with httpx.AsyncClient( + transport=_NonClosingTransportProxy(transport), + timeout=httpx.Timeout(self._timeout), + follow_redirects=self._follow_redirects, + cookies=cookies_dict, + ) as client: + return await self._make_request( + client, method, url, raise_exception, **kwargs + ) + + # 兜底:没有运行中的事件循环时,临时客户端走完即关 + async with httpx.AsyncClient( + proxy=self._proxies, + timeout=self._timeout, + verify=self._verify, + follow_redirects=self._follow_redirects, + cookies=cookies_dict, + ) as client: + return await self._make_request( + client, method, url, raise_exception, **kwargs + ) + + async def _make_request( + self, + client: httpx.AsyncClient, + method: str, + url: str, + raise_exception: bool = False, + **kwargs, + ) -> Optional[httpx.Response]: """ 执行实际的异步请求 """ kwargs.setdefault("headers", self._headers) - # Cookie已经在AsyncClient创建时设置,不要在request时再设置,否则会被覆盖 - # kwargs.setdefault("cookies", self._cookies) + # 共享池下 client 自带默认 timeout,这里用每请求 timeout 覆盖以尊重实例配置 + kwargs.setdefault("timeout", self._timeout) + # Cookie 在 request() 入口已按 path 处理: + # - path A(用户自管 client):kwargs["cookies"] 已注入 + # - path B/C(新建 AsyncClient):构造时已绑定 cookies + # 这里不重复 setdefault,避免覆盖各 path 的设定 + + method_upper = method.upper() + # 仅对幂等方法做 stale-pool 竞态重试:复用了刚被对端 FIN 的 keep-alive 连接时, + # 实际请求通常未到服务端,httpx 自身不重试,这里兜底一次。 + is_idempotent = method_upper in ("GET", "HEAD", "OPTIONS") + stale_conn_errs = (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError) try: return await client.request(method, url, **kwargs) + except stale_conn_errs as e: + if is_idempotent: + logger.debug(f"keep-alive 连接已失效,幂等方法重试一次: {e!r}") + try: + return await client.request(method, url, **kwargs) + except httpx.RequestError as e2: + error_msg = ( + str(e2) or f"未知网络错误 (URL: {url}, Method: {method_upper})" + ) + logger.debug(f"重试后异步请求仍失败: {error_msg}") + if raise_exception: + raise + return None + # 非幂等方法(POST/PUT/PATCH/DELETE 等)不重试以避免重复副作用, + # 但仍记录调试日志,避免静默失败掩盖问题 + error_msg = str(e) or f"未知网络错误 (URL: {url}, Method: {method_upper})" + logger.debug(f"异步请求失败(非幂等不重试): {error_msg}") + if raise_exception: + raise + return None except httpx.RequestError as e: # 获取更详细的错误信息 - error_msg = str(e) if str(e) else f"未知网络错误 (URL: {url}, Method: {method.upper()})" + error_msg = str(e) or f"未知网络错误 (URL: {url}, Method: {method_upper})" logger.debug(f"异步请求失败: {error_msg}") if raise_exception: raise @@ -736,7 +1020,9 @@ class AsyncRequestUtils: if response is not None: await response.aclose() - async def post(self, url: str, data: Any = None, json: dict = None, **kwargs) -> Optional[httpx.Response]: + async def post( + self, url: str, data: Any = None, json: dict = None, **kwargs + ) -> Optional[httpx.Response]: """ 发送异步POST请求 :param url: 请求的URL @@ -745,9 +1031,13 @@ class AsyncRequestUtils: :param kwargs: 其他请求参数,如headers, cookies, proxies等 :return: HTTP响应对象,若发生RequestError则返回None """ - return await self.request(method="post", url=url, data=data, json=json, **kwargs) + return await self.request( + method="post", url=url, data=data, json=json, **kwargs + ) - async def put(self, url: str, data: Any = None, **kwargs) -> Optional[httpx.Response]: + async def put( + self, url: str, data: Any = None, **kwargs + ) -> Optional[httpx.Response]: """ 发送异步PUT请求 :param url: 请求的URL @@ -757,14 +1047,16 @@ class AsyncRequestUtils: """ return await self.request(method="put", url=url, data=data, **kwargs) - async def get_res(self, - url: str, - params: dict = None, - data: Any = None, - json: dict = None, - allow_redirects: bool = True, - raise_exception: bool = False, - **kwargs) -> Optional[httpx.Response]: + async def get_res( + self, + url: str, + params: dict = None, + data: Any = None, + json: dict = None, + allow_redirects: bool = True, + raise_exception: bool = False, + **kwargs, + ) -> Optional[httpx.Response]: """ 发送异步GET请求并返回响应对象 :param url: 请求的URL @@ -777,40 +1069,114 @@ class AsyncRequestUtils: :return: HTTP响应对象,若发生RequestError则返回None :raises: httpx.RequestError 仅raise_exception为True时会抛出 """ - return await self.request(method="get", - url=url, - params=params, - data=data, - json=json, - follow_redirects=allow_redirects, - raise_exception=raise_exception, - **kwargs) + return await self.request( + method="get", + url=url, + params=params, + data=data, + json=json, + follow_redirects=allow_redirects, + raise_exception=raise_exception, + **kwargs, + ) @asynccontextmanager - async def get_stream(self, url: str, params: dict = None, **kwargs): + async def get_stream( + self, + url: str, + params: dict = None, + raise_exception: bool = False, + **kwargs, + ): """ - 获取异步流式响应的上下文管理器,适用于大文件下载 + 获取异步流式响应的上下文管理器,适用于大文件下载。 + 使用 httpx.AsyncClient.stream() 标准流式 API,避免把响应体一次性读入内存。 + :param url: 请求的URL :param params: 请求的参数 - :param kwargs: 其他请求参数 + :param raise_exception: 是否在发生异常时抛出,否则吞掉并 yield None + :param kwargs: 其他请求参数(headers, cookies 等) + :return: 上下文管理器,进入后 yield httpx.Response(出错时 yield None) """ - kwargs['stream'] = True - response = await self.request(method="get", url=url, params=params, **kwargs) - try: - yield response - finally: - if response is not None: - await response.aclose() + cookies_dict: Optional[dict] = self._cookies if isinstance(self._cookies, dict) else None + kwargs.setdefault("headers", self._headers) - async def post_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - files: Any = None, - json: dict = None, - raise_exception: bool = False, - **kwargs) -> Optional[httpx.Response]: + # 与 _make_request 保持一致:复用 keep-alive 时偶遇对端 FIN 的连接, + # 流式 GET 是幂等的,单次重试即可 + stale_conn_errs = (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError) + + async with AsyncExitStack() as stack: + # 选 client:复用与 request() 相同的三条 path 逻辑 + if self._client is not None: + client = self._client + if cookies_dict is not None: + kwargs.setdefault("cookies", cookies_dict) + else: + transport = _get_shared_async_transport( + proxy=self._proxies, + verify=self._verify, + max_keepalive_connections=self._max_keepalive_connections, + max_connections=self._max_connections, + keepalive_expiry=self._keepalive_expiry, + ) + if transport is not None: + client = await stack.enter_async_context( + httpx.AsyncClient( + transport=_NonClosingTransportProxy(transport), + timeout=httpx.Timeout(self._timeout), + follow_redirects=self._follow_redirects, + cookies=cookies_dict, + ) + ) + else: + client = await stack.enter_async_context( + httpx.AsyncClient( + proxy=self._proxies, + timeout=self._timeout, + verify=self._verify, + follow_redirects=self._follow_redirects, + cookies=cookies_dict, + ) + ) + + try: + response = await stack.enter_async_context( + client.stream("GET", url, params=params, **kwargs) + ) + except stale_conn_errs as e: + logger.debug(f"流式 keep-alive 连接已失效,重试一次: {e!r}") + try: + response = await stack.enter_async_context( + client.stream("GET", url, params=params, **kwargs) + ) + except httpx.RequestError as e2: + logger.debug(f"重试后异步流式请求仍失败: {e2!r}") + if raise_exception: + raise + yield None + return + except httpx.RequestError as e: + logger.debug(f"异步流式请求失败: {e!r}") + if raise_exception: + raise + yield None + return + + # AsyncExitStack 反向 unwind:先关 stream,再关 owned client; + # yield 体内的异常由标准 async with 协议透传给各 __aexit__ + yield response + + async def post_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + files: Any = None, + json: dict = None, + raise_exception: bool = False, + **kwargs, + ) -> Optional[httpx.Response]: """ 发送异步POST请求并返回响应对象 :param url: 请求的URL @@ -824,25 +1190,29 @@ class AsyncRequestUtils: :return: HTTP响应对象,若发生RequestError则返回None :raises: httpx.RequestError 仅raise_exception为True时会抛出 """ - return await self.request(method="post", - url=url, - data=data, - params=params, - follow_redirects=allow_redirects, - files=files, - json=json, - raise_exception=raise_exception, - **kwargs) + return await self.request( + method="post", + url=url, + data=data, + params=params, + follow_redirects=allow_redirects, + files=files, + json=json, + raise_exception=raise_exception, + **kwargs, + ) - async def put_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - files: Any = None, - json: dict = None, - raise_exception: bool = False, - **kwargs) -> Optional[httpx.Response]: + async def put_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + files: Any = None, + json: dict = None, + raise_exception: bool = False, + **kwargs, + ) -> Optional[httpx.Response]: """ 发送异步PUT请求并返回响应对象 :param url: 请求的URL @@ -856,23 +1226,27 @@ class AsyncRequestUtils: :return: HTTP响应对象,若发生RequestError则返回None :raises: httpx.RequestError 仅raise_exception为True时会抛出 """ - return await self.request(method="put", - url=url, - data=data, - params=params, - follow_redirects=allow_redirects, - files=files, - json=json, - raise_exception=raise_exception, - **kwargs) + return await self.request( + method="put", + url=url, + data=data, + params=params, + follow_redirects=allow_redirects, + files=files, + json=json, + raise_exception=raise_exception, + **kwargs, + ) - async def delete_res(self, - url: str, - data: Any = None, - params: dict = None, - allow_redirects: bool = True, - raise_exception: bool = False, - **kwargs) -> Optional[httpx.Response]: + async def delete_res( + self, + url: str, + data: Any = None, + params: dict = None, + allow_redirects: bool = True, + raise_exception: bool = False, + **kwargs, + ) -> Optional[httpx.Response]: """ 发送异步DELETE请求并返回响应对象 :param url: 请求的URL @@ -884,13 +1258,15 @@ class AsyncRequestUtils: :return: HTTP响应对象,若发生RequestError则返回None :raises: httpx.RequestError 仅raise_exception为True时会抛出 """ - return await self.request(method="delete", - url=url, - data=data, - params=params, - follow_redirects=allow_redirects, - raise_exception=raise_exception, - **kwargs) + return await self.request( + method="delete", + url=url, + data=data, + params=params, + follow_redirects=allow_redirects, + raise_exception=raise_exception, + **kwargs, + ) async def get_json(self, url: str, params: dict = None, **kwargs) -> Optional[dict]: """ @@ -914,7 +1290,9 @@ class AsyncRequestUtils: if response is not None: await response.aclose() - async def post_json(self, url: str, data: Any = None, json: dict = None, **kwargs) -> Optional[dict]: + async def post_json( + self, url: str, data: Any = None, json: dict = None, **kwargs + ) -> Optional[dict]: """ 发送异步POST请求并返回JSON数据,自动关闭连接 :param url: 请求的URL @@ -925,7 +1303,9 @@ class AsyncRequestUtils: """ if json is None: json = {} - response = await self.request(method="post", url=url, data=data, json=json, **kwargs) + response = await self.request( + method="post", url=url, data=data, json=json, **kwargs + ) try: if response: try: