mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-12 10:57:16 +08:00
AsyncRequestUtils 使用按事件循环弱引用持有的共享 AsyncHTTPTransport 作为底层连接池与 TLS 会话;每次请求创建轻量 AsyncClient 承载本次 cookie jar、timeout、follow_redirects, 用完即销毁。共享 transport 由 _NonClosingTransportProxy 包装后 注入 AsyncClient,吞掉 AsyncClient 退出时向底层 transport 传播的 __aexit__/aclose,使底层连接池跨调用持久,从而真正复用 TCP/TLS 握手。 设计要点: - 共享 transport 桶按 (proxy, verify, max_keepalive_connections, max_connections, keepalive_expiry) 区分;每事件循环 32 桶 LRU 上限,超出后异步关闭最久未用桶;关闭 task 由模块级强引用集合 持有以兼容 Python 3.11+ 的任务 GC 行为。 - 通过 FastAPI lifespan shutdown 调用 aclose_shared_async_transports 集中释放底层 transport,避免 ResourceWarning。 - AsyncRequestUtils.request 走三条 path:用户自管 client / 共享 transport + per-call AsyncClient / 兜底临时 client。三条路径 cookie 语义一致;后两条因 per-call AsyncClient 生命周期局限于 单次调用,天然不积累 Set-Cookie,避免跨调用 jar 演化串扰。 - _make_request 对幂等方法(GET/HEAD/OPTIONS)在 RemoteProtocolError / ReadError / WriteError 时单次重试, 容忍 keep-alive stale 连接命中;非幂等方法不重试,但记录 debug 日志。 - get_stream 使用 httpx.AsyncClient.stream() 标准流式 API,与 request 共用三条 path 的 client 选择逻辑;幂等单次重试; yield 体异常透传给 stream 的 __aexit__。 公共 API 表面零变动。插件可通过 max_keepalive_connections / max_connections / keepalive_expiry 三个 limits 参数为自己定制 连接池容量与握手有效期。 TMDB 真实压测(10 部美剧 × 每部 50 集,1020 请求): 61.96s → 18.15s(3.41×),单请求 p95 149.6ms → 38.1ms。
89 lines
2.7 KiB
Python
89 lines
2.7 KiB
Python
import asyncio
|
||
from contextlib import asynccontextmanager
|
||
|
||
from fastapi import FastAPI
|
||
|
||
from app.chain.system import SystemChain
|
||
from app.core.config import global_vars
|
||
from app.helper.system import SystemHelper
|
||
from app.startup.command_initializer import init_command, stop_command, restart_command
|
||
from app.startup.modules_initializer import init_modules, stop_modules
|
||
from app.startup.monitor_initializer import stop_monitor, init_monitor
|
||
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
|
||
from app.startup.routers_initializer import init_routers
|
||
from app.startup.scheduler_initializer import stop_scheduler, init_scheduler, init_plugin_scheduler
|
||
from app.startup.workflow_initializer import init_workflow, stop_workflow
|
||
from app.utils.http import aclose_shared_async_transports
|
||
|
||
|
||
async def init_extra():
|
||
"""
|
||
同步插件及重启相关依赖服务
|
||
"""
|
||
if await sync_plugins():
|
||
# 重新注册插件定时服务
|
||
init_plugin_scheduler()
|
||
# 重新注册命令
|
||
restart_command()
|
||
# 设置系统已修改标志
|
||
SystemHelper().set_system_modified()
|
||
# 重启完成
|
||
SystemChain().restart_finish()
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
"""
|
||
定义应用的生命周期事件
|
||
"""
|
||
print("Starting up...")
|
||
# 存储当前循环
|
||
global_vars.set_loop(asyncio.get_event_loop())
|
||
# 初始化路由
|
||
init_routers(app)
|
||
# 初始化模块
|
||
init_modules()
|
||
# 恢复插件备份
|
||
SystemChain().restore_plugins()
|
||
# 初始化插件
|
||
init_plugins()
|
||
# 初始化定时器
|
||
init_scheduler()
|
||
# 初始化监控器
|
||
init_monitor()
|
||
# 初始化命令
|
||
init_command()
|
||
# 初始化工作流
|
||
init_workflow()
|
||
# 插件同步到本地
|
||
sync_plugins_task = asyncio.create_task(init_extra())
|
||
try:
|
||
# 在此处 yield,表示应用已经启动,控制权交回 FastAPI 主事件循环
|
||
yield
|
||
finally:
|
||
print("Shutting down...")
|
||
# 取消同步插件任务
|
||
try:
|
||
sync_plugins_task.cancel()
|
||
await sync_plugins_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
except Exception as e:
|
||
print(str(e))
|
||
# 备份插件
|
||
SystemChain().backup_plugins()
|
||
# 停止工作流
|
||
stop_workflow()
|
||
# 停止命令
|
||
stop_command()
|
||
# 停止监控器
|
||
stop_monitor()
|
||
# 停止定时器
|
||
stop_scheduler()
|
||
# 停止插件
|
||
stop_plugins()
|
||
# 停止模块
|
||
await stop_modules()
|
||
# 关闭共享的异步 HTTP 连接池,释放底层连接资源
|
||
await aclose_shared_async_transports()
|