diff --git a/app/api/endpoints/download.py b/app/api/endpoints/download.py index 922a569a..5dd07d7b 100644 --- a/app/api/endpoints/download.py +++ b/app/api/endpoints/download.py @@ -9,14 +9,41 @@ from app.core.context import MediaInfo, Context, SubtitleInfo, TorrentInfo from app.core.metainfo import MetaInfo from app.core.security import verify_token from app.db.models.user import User +from app.db.site_oper import SiteOper from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_user from app.helper.directory import DirectoryHelper from app.schemas.types import SystemConfigKey +from app.utils.security import SecurityUtils router = APIRouter() +def _prepare_subtitle_download(subtitle: SubtitleInfo) -> tuple[bool, str]: + """ + 校验字幕下载签名,并用服务端站点配置覆盖请求凭据。 + """ + if subtitle.site is None: + return False, "字幕站点信息为空" + + clean_url = SecurityUtils.verify_signed_url( + subtitle.enclosure, + purpose=SecurityUtils.subtitle_download_purpose(subtitle.site), + ) + if not clean_url: + return False, "字幕下载链接签名无效" + + site = SiteOper().get(subtitle.site) + if not site: + return False, "字幕站点信息不存在" + + subtitle.enclosure = clean_url + subtitle.site_cookie = site.cookie + subtitle.site_ua = site.ua + subtitle.site_proxy = bool(site.proxy) + return True, "" + + @router.get("/", summary="正在下载", response_model=List[schemas.DownloaderTorrent]) def current( name: Optional[str] = None, _: schemas.TokenPayload = Depends(verify_token) @@ -127,6 +154,10 @@ def download_subtitle( """ subtitle_info = SubtitleInfo() subtitle_info.from_dict(subtitle_in.model_dump()) + valid, message = _prepare_subtitle_download(subtitle_info) + if not valid: + return schemas.Response(success=False, message=message) + success, message, saved_files = DownloadChain().download_subtitle( subtitle=subtitle_info, tmdbid=tmdbid, diff --git a/app/api/endpoints/search.py b/app/api/endpoints/search.py index 43ea280a..dff33965 100644 --- a/app/api/endpoints/search.py +++ b/app/api/endpoints/search.py @@ -15,6 +15,7 @@ from app.core.security import verify_resource_token, verify_token from app.log import logger from app.schemas import MediaRecognizeConvertEventData from app.schemas.types import MediaType, ChainEventType +from app.utils.security import SecurityUtils router = APIRouter() @@ -45,6 +46,49 @@ def _sse_event(data: dict) -> str: return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" +def _serialize_signed_subtitle_result(subtitle: Any) -> dict: + """ + 序列化字幕结果并签名下载链接,签名用途绑定站点 ID。 + """ + data = subtitle.to_dict() if hasattr(subtitle, "to_dict") else dict(subtitle) + enclosure = data.get("enclosure") + if enclosure: + data["enclosure"] = SecurityUtils.sign_url( + enclosure, + purpose=SecurityUtils.subtitle_download_purpose(data.get("site")), + ) + return data + + +def _serialize_signed_subtitle_results(subtitles: List[Any]) -> List[dict]: + """ + 批量序列化字幕结果,确保返回给客户端的下载链接均已签名。 + """ + return [_serialize_signed_subtitle_result(subtitle) for subtitle in subtitles] + + +def _sign_subtitle_search_event(event: dict) -> dict: + """ + 签名字幕搜索流事件中的下载链接。 + """ + signed_event = dict(event) + if "items" in signed_event: + signed_event["items"] = _serialize_signed_subtitle_results( + signed_event.get("items") or [] + ) + return signed_event + + +async def _iter_signed_subtitle_search_events( + event_source: AsyncIterator[dict], +) -> AsyncIterator[dict]: + """ + 输出仅包含签名字幕下载链接的搜索流事件。 + """ + async for event in event_source: + yield _sign_subtitle_search_event(event) + + def _merge_append_event(pending_event: Optional[dict], event: dict) -> dict: """ 合并短时间内连续到达的 append 事件,降低前端刷新频率。 @@ -168,7 +212,9 @@ async def search_latest_context(_: schemas.TokenPayload = Depends(verify_token)) success=True, data={ "params": params, - "results": [result.to_dict() for result in results], + "results": _serialize_signed_subtitle_results(results) + if params.get("result_type") == "subtitle" + else [result.to_dict() for result in results], }, ) @@ -625,7 +671,11 @@ async def search_subtitle_by_title_stream( title=keyword, page=page, sites=_parse_site_list(sites), cache_local=True ) return StreamingResponse( - _stream_search_events(request, event_source), media_type="text/event-stream" + _stream_search_events( + request, + _iter_signed_subtitle_search_events(event_source), + ), + media_type="text/event-stream", ) @@ -645,7 +695,7 @@ async def search_subtitle_by_title( if not subtitles: return schemas.Response(success=False, message="未搜索到任何字幕") return schemas.Response( - success=True, data=[subtitle.to_dict() for subtitle in subtitles] + success=True, data=_serialize_signed_subtitle_results(subtitles) ) @@ -798,7 +848,11 @@ async def search_subtitle_by_id_stream( yield event return StreamingResponse( - _stream_search_events(request, event_source()), media_type="text/event-stream" + _stream_search_events( + request, + _iter_signed_subtitle_search_events(event_source()), + ), + media_type="text/event-stream", ) @@ -832,7 +886,7 @@ async def search_subtitle_by_id( if not subtitles: return schemas.Response(success=False, message="未搜索到任何字幕") return schemas.Response( - success=True, data=[subtitle.to_dict() for subtitle in subtitles] + success=True, data=_serialize_signed_subtitle_results(subtitles) ) diff --git a/app/utils/security.py b/app/utils/security.py index 054a3030..71619134 100644 --- a/app/utils/security.py +++ b/app/utils/security.py @@ -107,6 +107,7 @@ def _resolve_addrinfo_to_ips( class SecurityUtils: _SIGNED_URL_PURPOSE = "image-proxy" + _SUBTITLE_DOWNLOAD_PURPOSE_PREFIX = "subtitle-download" @staticmethod def is_safe_path(base_path: Path, user_path: Path, @@ -452,16 +453,23 @@ class SecurityUtils: @staticmethod def strip_url_signature(url: str) -> str: """ - 移除 URL fragment 中的代理签名信息,得到真正要请求的地址。 + 移除 URL fragment 中的资源签名信息,得到真正要请求的地址。 - 图片代理签名放在 fragment 中,浏览器会把它传给 MoviePilot,但 HTTP - 客户端请求媒体服务器前不能把这些内部参数带过去。 + 签名放在 fragment 中,浏览器会把它传给 MoviePilot,但 HTTP 客户端 + 请求外部资源前不能把这些内部参数带过去。 """ if not url: return url parsed_url = urlparse(url) return urlunparse(parsed_url._replace(fragment="")) + @staticmethod + def subtitle_download_purpose(site_id: int) -> str: + """ + 构造字幕下载 URL 签名用途,签名必须绑定站点 ID,避免跨站点复用。 + """ + return f"{SecurityUtils._SUBTITLE_DOWNLOAD_PURPOSE_PREFIX}:{site_id}" + @staticmethod def sign_url( url: str, @@ -470,9 +478,8 @@ class SecurityUtils: """ 给服务端返回的资源 URL 添加稳定签名。 - 签名作为 `/system/img` 代理放行私网图片 URL 的能力凭证:图片代理默认 - 拒绝解析到非公网地址的 URL(防 SSRF),合法媒体服务器 URL 必须由后端 - 预先签名后才能跳过该限制。 + 签名作为后端资源能力凭证:外部请求边界可以用不同 `purpose` 绑定 + 具体业务语义,避免一个场景签出的 URL 被挪用到另一个场景。 签名为 `(url, purpose, RESOURCE_SECRET_KEY)` 的确定性 HMAC,**不带 过期时间**:相同 URL 多次调用结果完全一致,让浏览器与 Service Worker @@ -500,7 +507,7 @@ class SecurityUtils: purpose: str = _SIGNED_URL_PURPOSE, ) -> Optional[str]: """ - 验证 URL fragment 中的代理签名,成功时返回去签名后的真实 URL。 + 验证 URL fragment 中的资源签名,成功时返回去签名后的真实 URL。 签名只校验 `(url, purpose, RESOURCE_SECRET_KEY)`,密钥轮换/进程重启 后旧签名自动失效。 diff --git a/tests/test_subtitle_signed_download.py b/tests/test_subtitle_signed_download.py new file mode 100644 index 00000000..019cb5b8 --- /dev/null +++ b/tests/test_subtitle_signed_download.py @@ -0,0 +1,250 @@ +import asyncio +import json +from types import SimpleNamespace + +import app.api.endpoints.download as download_endpoint +import app.api.endpoints.search as search_endpoint +from app import schemas +from app.core.context import SubtitleInfo +from app.utils.security import SecurityUtils + + +SUBTITLE_SITE_ID = 1001 +SUBTITLE_PURPOSE = f"subtitle-download:{SUBTITLE_SITE_ID}" +SUBTITLE_URL = "https://example.test/downloadsubs.php?torrentid=1&subid=2" + + +class _NeverDisconnectedRequest: + """ + 为 SSE 测试提供始终在线的请求对象。 + """ + + async def is_disconnected(self): + return False + + +def _run(coro): + """ + 在同步测试中运行异步 endpoint。 + """ + return asyncio.run(coro) + + +async def _collect_sse_events(response): + """ + 读取 StreamingResponse 的 SSE 数据并解析为事件字典。 + """ + body = "" + async for chunk in response.body_iterator: + if isinstance(chunk, bytes): + body += chunk.decode("utf-8") + else: + body += chunk + + events = [] + for block in body.strip().split("\n\n"): + if not block: + continue + line = block.removeprefix("data: ") + events.append(json.loads(line)) + return events + + +def _subtitle_payload(enclosure=SUBTITLE_URL, **overrides): + """ + 构造下载字幕接口入参。 + """ + payload = { + "site": SUBTITLE_SITE_ID, + "site_name": "ExampleSite", + "site_cookie": "client-cookie=1", + "site_ua": "ClientUA", + "site_proxy": False, + "title": "Demo.Movie.2026.zh-cn.srt", + "enclosure": enclosure, + "language": "简体中文", + "size": 1024, + } + payload.update(overrides) + return schemas.SubtitleInfo(**payload) + + +def test_subtitle_title_search_response_signs_enclosure(monkeypatch): + """ + 普通字幕搜索返回给客户端的下载链接必须带字幕下载专用签名。 + """ + + class FakeSearchChain: + async def async_search_subtitles_by_title(self, **_kwargs): + return [ + SubtitleInfo( + site=SUBTITLE_SITE_ID, + site_name="ExampleSite", + title="Demo.Movie.2026.zh-cn.srt", + enclosure=SUBTITLE_URL, + language="简体中文", + size=1024, + ) + ] + + monkeypatch.setattr(search_endpoint, "SearchChain", FakeSearchChain) + + response = _run(search_endpoint.search_subtitle_by_title(keyword="Demo", _=None)) + + assert response.success + signed_url = response.data[0]["enclosure"] + assert signed_url != SUBTITLE_URL + assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL + assert SecurityUtils.verify_signed_url(signed_url) is None + + +def test_subtitle_media_search_response_signs_enclosure(monkeypatch): + """ + 精确媒体字幕搜索返回给客户端的下载链接必须带字幕下载专用签名。 + """ + + async def fake_build_subtitle_search_source(**_kwargs): + async def search_result(): + return [ + SubtitleInfo( + site=SUBTITLE_SITE_ID, + site_name="ExampleSite", + title="Demo.Movie.2026.zh-cn.srt", + enclosure=SUBTITLE_URL, + language="简体中文", + size=1024, + ) + ] + + return search_result(), "" + + monkeypatch.setattr( + search_endpoint, + "_build_subtitle_search_source", + fake_build_subtitle_search_source, + ) + + response = _run(search_endpoint.search_subtitle_by_id(mediaid="tmdb:1", _=None)) + + assert response.success + signed_url = response.data[0]["enclosure"] + assert signed_url != SUBTITLE_URL + assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL + + +def test_subtitle_search_sse_events_sign_enclosures(monkeypatch): + """ + 字幕 SSE 的 append/replace/done 事件都应只暴露签名后的下载链接。 + """ + + class FakeSearchChain: + def async_search_subtitles_by_title_stream(self, **_kwargs): + async def source(): + for event_type in ("append", "replace", "done"): + yield { + "type": event_type, + "stage": "complete" if event_type == "done" else "searching", + "items": [ + { + "site": SUBTITLE_SITE_ID, + "site_name": "ExampleSite", + "title": f"{event_type}.srt", + "enclosure": SUBTITLE_URL, + "language": "简体中文", + "size": 1024, + } + ], + } + + return source() + + monkeypatch.setattr(search_endpoint, "SearchChain", FakeSearchChain) + + response = _run( + search_endpoint.search_subtitle_by_title_stream( + request=_NeverDisconnectedRequest(), + keyword="Demo", + _=None, + ) + ) + events = _run(_collect_sse_events(response)) + + assert [event["type"] for event in events] == ["append", "replace", "done"] + for event in events: + signed_url = event["items"][0]["enclosure"] + assert signed_url != SUBTITLE_URL + assert SecurityUtils.verify_signed_url(signed_url, purpose=SUBTITLE_PURPOSE) == SUBTITLE_URL + + +def test_download_subtitle_rejects_unsigned_enclosure(monkeypatch): + """ + 下载接口必须拒绝历史未签名字幕链接。 + """ + + class FakeDownloadChain: + def download_subtitle(self, **_kwargs): + return True, "不应下载", ["unsafe.srt"] + + monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain) + + response = download_endpoint.download_subtitle( + subtitle_in=_subtitle_payload(), + current_user=SimpleNamespace(name="tester"), + ) + + assert not response.success + assert "签名" in response.message + + +def test_download_subtitle_rejects_other_purpose_signature(monkeypatch): + """ + 其它用途的签名不能复用于字幕下载。 + """ + + class FakeDownloadChain: + def download_subtitle(self, **_kwargs): + return True, "不应下载", ["unsafe.srt"] + + signed_url = SecurityUtils.sign_url(SUBTITLE_URL, purpose="image-proxy") + monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain) + + response = download_endpoint.download_subtitle( + subtitle_in=_subtitle_payload(enclosure=signed_url), + current_user=SimpleNamespace(name="tester"), + ) + + assert not response.success + assert "签名" in response.message + + +def test_download_subtitle_cleans_url_and_uses_server_site_request_fields(monkeypatch): + """ + 下载链只应收到去签名后的真实 URL,以及服务端站点配置中的请求凭据。 + """ + captured = {} + signed_url = SecurityUtils.sign_url(SUBTITLE_URL, purpose=SUBTITLE_PURPOSE) + + class FakeSiteOper: + def get(self, site_id): + assert site_id == SUBTITLE_SITE_ID + return SimpleNamespace(cookie="server-cookie=1", ua="ServerUA", proxy=True) + + class FakeDownloadChain: + def download_subtitle(self, **kwargs): + captured.update(kwargs) + return True, "字幕下载成功", ["/downloads/Demo.Movie.2026.zh-cn.srt"] + + monkeypatch.setattr(download_endpoint, "SiteOper", FakeSiteOper, raising=False) + monkeypatch.setattr(download_endpoint, "DownloadChain", FakeDownloadChain) + + response = download_endpoint.download_subtitle( + subtitle_in=_subtitle_payload(enclosure=signed_url), + current_user=SimpleNamespace(name="tester"), + ) + + subtitle = captured["subtitle"] + assert response.success + assert subtitle.enclosure == SUBTITLE_URL + assert subtitle.site_cookie == "server-cookie=1" + assert subtitle.site_ua == "ServerUA" + assert subtitle.site_proxy is True