fix(security): require signed subtitle downloads (#6055)

This commit is contained in:
InfinityPacer
2026-07-05 09:43:58 +08:00
committed by GitHub
parent 95b6adbeee
commit d977e4c48a
4 changed files with 354 additions and 12 deletions

View File

@@ -9,14 +9,41 @@ from app.core.context import MediaInfo, Context, SubtitleInfo, TorrentInfo
from app.core.metainfo import MetaInfo
from app.core.security import verify_token
from app.db.models.user import User
from app.db.site_oper import SiteOper
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.helper.directory import DirectoryHelper
from app.schemas.types import SystemConfigKey
from app.utils.security import SecurityUtils
router = APIRouter()
def _prepare_subtitle_download(subtitle: SubtitleInfo) -> tuple[bool, str]:
"""
校验字幕下载签名,并用服务端站点配置覆盖请求凭据。
"""
if subtitle.site is None:
return False, "字幕站点信息为空"
clean_url = SecurityUtils.verify_signed_url(
subtitle.enclosure,
purpose=SecurityUtils.subtitle_download_purpose(subtitle.site),
)
if not clean_url:
return False, "字幕下载链接签名无效"
site = SiteOper().get(subtitle.site)
if not site:
return False, "字幕站点信息不存在"
subtitle.enclosure = clean_url
subtitle.site_cookie = site.cookie
subtitle.site_ua = site.ua
subtitle.site_proxy = bool(site.proxy)
return True, ""
@router.get("/", summary="正在下载", response_model=List[schemas.DownloaderTorrent])
def current(
name: Optional[str] = None, _: schemas.TokenPayload = Depends(verify_token)
@@ -127,6 +154,10 @@ def download_subtitle(
"""
subtitle_info = SubtitleInfo()
subtitle_info.from_dict(subtitle_in.model_dump())
valid, message = _prepare_subtitle_download(subtitle_info)
if not valid:
return schemas.Response(success=False, message=message)
success, message, saved_files = DownloadChain().download_subtitle(
subtitle=subtitle_info,
tmdbid=tmdbid,

View File

@@ -15,6 +15,7 @@ from app.core.security import verify_resource_token, verify_token
from app.log import logger
from app.schemas import MediaRecognizeConvertEventData
from app.schemas.types import MediaType, ChainEventType
from app.utils.security import SecurityUtils
router = APIRouter()
@@ -45,6 +46,49 @@ def _sse_event(data: dict) -> str:
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def _serialize_signed_subtitle_result(subtitle: Any) -> dict:
"""
序列化字幕结果并签名下载链接,签名用途绑定站点 ID。
"""
data = subtitle.to_dict() if hasattr(subtitle, "to_dict") else dict(subtitle)
enclosure = data.get("enclosure")
if enclosure:
data["enclosure"] = SecurityUtils.sign_url(
enclosure,
purpose=SecurityUtils.subtitle_download_purpose(data.get("site")),
)
return data
def _serialize_signed_subtitle_results(subtitles: List[Any]) -> List[dict]:
"""
批量序列化字幕结果,确保返回给客户端的下载链接均已签名。
"""
return [_serialize_signed_subtitle_result(subtitle) for subtitle in subtitles]
def _sign_subtitle_search_event(event: dict) -> dict:
"""
签名字幕搜索流事件中的下载链接。
"""
signed_event = dict(event)
if "items" in signed_event:
signed_event["items"] = _serialize_signed_subtitle_results(
signed_event.get("items") or []
)
return signed_event
async def _iter_signed_subtitle_search_events(
event_source: AsyncIterator[dict],
) -> AsyncIterator[dict]:
"""
输出仅包含签名字幕下载链接的搜索流事件。
"""
async for event in event_source:
yield _sign_subtitle_search_event(event)
def _merge_append_event(pending_event: Optional[dict], event: dict) -> dict:
"""
合并短时间内连续到达的 append 事件,降低前端刷新频率。
@@ -168,7 +212,9 @@ async def search_latest_context(_: schemas.TokenPayload = Depends(verify_token))
success=True,
data={
"params": params,
"results": [result.to_dict() for result in results],
"results": _serialize_signed_subtitle_results(results)
if params.get("result_type") == "subtitle"
else [result.to_dict() for result in results],
},
)
@@ -625,7 +671,11 @@ async def search_subtitle_by_title_stream(
title=keyword, page=page, sites=_parse_site_list(sites), cache_local=True
)
return StreamingResponse(
_stream_search_events(request, event_source), media_type="text/event-stream"
_stream_search_events(
request,
_iter_signed_subtitle_search_events(event_source),
),
media_type="text/event-stream",
)
@@ -645,7 +695,7 @@ async def search_subtitle_by_title(
if not subtitles:
return schemas.Response(success=False, message="未搜索到任何字幕")
return schemas.Response(
success=True, data=[subtitle.to_dict() for subtitle in subtitles]
success=True, data=_serialize_signed_subtitle_results(subtitles)
)
@@ -798,7 +848,11 @@ async def search_subtitle_by_id_stream(
yield event
return StreamingResponse(
_stream_search_events(request, event_source()), media_type="text/event-stream"
_stream_search_events(
request,
_iter_signed_subtitle_search_events(event_source()),
),
media_type="text/event-stream",
)
@@ -832,7 +886,7 @@ async def search_subtitle_by_id(
if not subtitles:
return schemas.Response(success=False, message="未搜索到任何字幕")
return schemas.Response(
success=True, data=[subtitle.to_dict() for subtitle in subtitles]
success=True, data=_serialize_signed_subtitle_results(subtitles)
)

View File

@@ -107,6 +107,7 @@ def _resolve_addrinfo_to_ips(
class SecurityUtils:
_SIGNED_URL_PURPOSE = "image-proxy"
_SUBTITLE_DOWNLOAD_PURPOSE_PREFIX = "subtitle-download"
@staticmethod
def is_safe_path(base_path: Path, user_path: Path,
@@ -452,16 +453,23 @@ class SecurityUtils:
@staticmethod
def strip_url_signature(url: str) -> str:
"""
移除 URL fragment 中的代理签名信息,得到真正要请求的地址。
移除 URL fragment 中的资源签名信息,得到真正要请求的地址。
图片代理签名放在 fragment 中,浏览器会把它传给 MoviePilot但 HTTP
客户端请求媒体服务器前不能把这些内部参数带过去。
签名放在 fragment 中,浏览器会把它传给 MoviePilot但 HTTP 客户端
请求外部资源前不能把这些内部参数带过去。
"""
if not url:
return url
parsed_url = urlparse(url)
return urlunparse(parsed_url._replace(fragment=""))
@staticmethod
def subtitle_download_purpose(site_id: int) -> str:
"""
构造字幕下载 URL 签名用途,签名必须绑定站点 ID避免跨站点复用。
"""
return f"{SecurityUtils._SUBTITLE_DOWNLOAD_PURPOSE_PREFIX}:{site_id}"
@staticmethod
def sign_url(
url: str,
@@ -470,9 +478,8 @@ class SecurityUtils:
"""
给服务端返回的资源 URL 添加稳定签名。
签名作为 `/system/img` 代理放行私网图片 URL 的能力凭证:图片代理默认
拒绝解析到非公网地址的 URL防 SSRF合法媒体服务器 URL 必须由后端
预先签名后才能跳过该限制。
签名作为后端资源能力凭证:外部请求边界可以用不同 `purpose` 绑定
具体业务语义,避免一个场景签出的 URL 被挪用到另一个场景。
签名为 `(url, purpose, RESOURCE_SECRET_KEY)` 的确定性 HMAC**不带
过期时间**:相同 URL 多次调用结果完全一致,让浏览器与 Service Worker
@@ -500,7 +507,7 @@ class SecurityUtils:
purpose: str = _SIGNED_URL_PURPOSE,
) -> Optional[str]:
"""
验证 URL fragment 中的代理签名,成功时返回去签名后的真实 URL。
验证 URL fragment 中的资源签名,成功时返回去签名后的真实 URL。
签名只校验 `(url, purpose, RESOURCE_SECRET_KEY)`,密钥轮换/进程重启
后旧签名自动失效。

View File

@@ -0,0 +1,250 @@
import asyncio
import json
from types import SimpleNamespace
import app.api.endpoints.download as download_endpoint
import app.api.endpoints.search as search_endpoint
from app import schemas
from app.core.context import SubtitleInfo
from app.utils.security import SecurityUtils
SUBTITLE_SITE_ID = 1001
SUBTITLE_PURPOSE = f"subtitle-download:{SUBTITLE_SITE_ID}"
SUBTITLE_URL = "https://example.test/downloadsubs.php?torrentid=1&subid=2"
class _NeverDisconnectedRequest:
"""
为 SSE 测试提供始终在线的请求对象。
"""
async def is_disconnected(self):
return False
def _run(coro):
"""
在同步测试中运行异步 endpoint。
"""
return asyncio.run(coro)
async def _collect_sse_events(response):
"""
读取 StreamingResponse 的 SSE 数据并解析为事件字典。
"""
body = ""
async for chunk in response.body_iterator:
if isinstance(chunk, bytes):
body += chunk.decode("utf-8")
else:
body += chunk
events = []
for block in body.strip().split("\n\n"):
if not block:
continue
line = block.removeprefix("data: ")
events.append(json.loads(line))
return events
def _subtitle_payload(enclosure=SUBTITLE_URL, **overrides):
"""
构造下载字幕接口入参。
"""
payload = {
"site": SUBTITLE_SITE_ID,
"site_name": "ExampleSite",
"site_cookie": "client-cookie=1",
"site_ua": "ClientUA",
"site_proxy": False,
"title": "Demo.Movie.2026.zh-cn.srt",
"enclosure": enclosure,
"language": "简体中文",
"size": 1024,
}
payload.update(overrides)
return schemas.SubtitleInfo(**payload)
def test_subtitle_title_search_response_signs_enclosure(monkeypatch):
"""
普通字幕搜索返回给客户端的下载链接必须带字幕下载专用签名。
"""
class FakeSearchChain:
async def async_search_subtitles_by_title(self, **_kwargs):
return [
SubtitleInfo(
site=SUBTITLE_SITE_ID,
site_name="ExampleSite",
title="Demo.Movie.2026.zh-cn.srt",
enclosure=SUBTITLE_URL,
language="简体中文",
size=1024,
)
]
monkeypatch.setattr(search_endpoint, "SearchChain", FakeSearchChain)
response = _run(search_endpoint.search_subtitle_by_title(keyword="Demo", _=None))
assert response.success
signed_url = response.data[0]["enclosure"]
assert signed_url != SUBTITLE_URL
assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL
assert SecurityUtils.verify_signed_url(signed_url) is None
def test_subtitle_media_search_response_signs_enclosure(monkeypatch):
"""
精确媒体字幕搜索返回给客户端的下载链接必须带字幕下载专用签名。
"""
async def fake_build_subtitle_search_source(**_kwargs):
async def search_result():
return [
SubtitleInfo(
site=SUBTITLE_SITE_ID,
site_name="ExampleSite",
title="Demo.Movie.2026.zh-cn.srt",
enclosure=SUBTITLE_URL,
language="简体中文",
size=1024,
)
]
return search_result(), ""
monkeypatch.setattr(
search_endpoint,
"_build_subtitle_search_source",
fake_build_subtitle_search_source,
)
response = _run(search_endpoint.search_subtitle_by_id(mediaid="tmdb:1", _=None))
assert response.success
signed_url = response.data[0]["enclosure"]
assert signed_url != SUBTITLE_URL
assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL
def test_subtitle_search_sse_events_sign_enclosures(monkeypatch):
"""
字幕 SSE 的 append/replace/done 事件都应只暴露签名后的下载链接。
"""
class FakeSearchChain:
def async_search_subtitles_by_title_stream(self, **_kwargs):
async def source():
for event_type in ("append", "replace", "done"):
yield {
"type": event_type,
"stage": "complete" if event_type == "done" else "searching",
"items": [
{
"site": SUBTITLE_SITE_ID,
"site_name": "ExampleSite",
"title": f"{event_type}.srt",
"enclosure": SUBTITLE_URL,
"language": "简体中文",
"size": 1024,
}
],
}
return source()
monkeypatch.setattr(search_endpoint, "SearchChain", FakeSearchChain)
response = _run(
search_endpoint.search_subtitle_by_title_stream(
request=_NeverDisconnectedRequest(),
keyword="Demo",
_=None,
)
)
events = _run(_collect_sse_events(response))
assert [event["type"] for event in events] == ["append", "replace", "done"]
for event in events:
signed_url = event["items"][0]["enclosure"]
assert signed_url != SUBTITLE_URL
assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL
def test_download_subtitle_rejects_unsigned_enclosure(monkeypatch):
"""
下载接口必须拒绝历史未签名字幕链接。
"""
class FakeDownloadChain:
def download_subtitle(self, **_kwargs):
return True, "不应下载", ["unsafe.srt"]
monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain)
response = download_endpoint.download_subtitle(
subtitle_in=_subtitle_payload(),
current_user=SimpleNamespace(name="tester"),
)
assert not response.success
assert "签名" in response.message
def test_download_subtitle_rejects_other_purpose_signature(monkeypatch):
"""
其它用途的签名不能复用于字幕下载。
"""
class FakeDownloadChain:
def download_subtitle(self, **_kwargs):
return True, "不应下载", ["unsafe.srt"]
signed_url = SecurityUtils.sign_url(SUBTITLE_URL, purpose="image-proxy")
monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain)
response = download_endpoint.download_subtitle(
subtitle_in=_subtitle_payload(enclosure=signed_url),
current_user=SimpleNamespace(name="tester"),
)
assert not response.success
assert "签名" in response.message
def test_download_subtitle_cleans_url_and_uses_server_site_request_fields(monkeypatch):
"""
下载链只应收到去签名后的真实 URL以及服务端站点配置中的请求凭据。
"""
captured = {}
signed_url = SecurityUtils.sign_url(SUBTITLE_URL, purpose=SUBTITLE_PURPOSE)
class FakeSiteOper:
def get(self, site_id):
assert site_id == SUBTITLE_SITE_ID
return SimpleNamespace(cookie="server-cookie=1", ua="ServerUA", proxy=True)
class FakeDownloadChain:
def download_subtitle(self, **kwargs):
captured.update(kwargs)
return True, "字幕下载成功", ["/downloads/Demo.Movie.2026.zh-cn.srt"]
monkeypatch.setattr(download_endpoint, "SiteOper", FakeSiteOper, raising=False)
monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain)
response = download_endpoint.download_subtitle(
subtitle_in=_subtitle_payload(enclosure=signed_url),
current_user=SimpleNamespace(name="tester"),
)
subtitle = captured["subtitle"]
assert response.success
assert subtitle.enclosure == SUBTITLE_URL
assert subtitle.site_cookie == "server-cookie=1"
assert subtitle.site_ua == "ServerUA"
assert subtitle.site_proxy is True