import asyncio import sys import unittest from types import ModuleType, SimpleNamespace from unittest.mock import patch def _stub_module(name: str, **attrs): module = sys.modules.get(name) if module is None: module = ModuleType(name) sys.modules[name] = module for key, value in attrs.items(): setattr(module, key, value) return module class _Dummy: def __init__(self, *args, **kwargs): pass def __getattr__(self, _name): return lambda *args, **kwargs: None class _DummyError(Exception): def __init__(self, message="", duration_ms=None): super().__init__(message) self.duration_ms = duration_ms for _module_name in ("pillow_avif", "aiofiles", "psutil"): _stub_module(_module_name) _stub_module("app.helper.sites", SitesHelper=_Dummy) _stub_module("app.chain.mediaserver", MediaServerChain=_Dummy) _stub_module("app.chain.search", SearchChain=_Dummy) _stub_module("app.chain.system", SystemChain=_Dummy) _stub_module("app.core.event", eventmanager=_Dummy()) _stub_module("app.core.metainfo", MetaInfo=_Dummy) _stub_module("app.core.module", ModuleManager=_Dummy) _stub_module( "app.core.security", verify_apitoken=_Dummy, verify_resource_token=_Dummy, verify_token=_Dummy, ) _stub_module("app.db.models", User=_Dummy) _stub_module("app.db.systemconfig_oper", SystemConfigOper=_Dummy) _stub_module( "app.db.user_oper", get_current_active_superuser=_Dummy, get_current_active_superuser_async=_Dummy, get_current_active_user_async=_Dummy, ) _stub_module( "app.helper.llm", LLMHelper=_Dummy, LLMTestError=_DummyError, LLMTestTimeout=_DummyError, ) _stub_module("app.helper.mediaserver", MediaServerHelper=_Dummy) _stub_module("app.helper.message", MessageHelper=_Dummy) _stub_module("app.helper.progress", ProgressHelper=_Dummy) _stub_module("app.helper.rule", RuleHelper=_Dummy) _stub_module("app.helper.subscribe", SubscribeHelper=_Dummy) _stub_module("app.helper.system", SystemHelper=_Dummy) _stub_module("app.helper.image", ImageHelper=_Dummy) _stub_module("app.scheduler", Scheduler=_Dummy) _stub_module( "app.log", logger=_Dummy(), log_settings=_Dummy(), LogConfigModel=type("LogConfigModel", (), {}), ) _stub_module("app.utils.crypto", HashUtils=_Dummy) _stub_module("app.utils.http", RequestUtils=_Dummy, AsyncRequestUtils=_Dummy) _stub_module("version", APP_VERSION="test") from app.api.endpoints import system as system_endpoint class NettestSecurityTest(unittest.TestCase): def test_nettest_targets_are_served_by_backend(self): resp = asyncio.run(system_endpoint.nettest_targets(_="token")) self.assertTrue(resp.success) self.assertTrue(any(item["id"] == "pip_proxy" for item in resp.data)) self.assertTrue(any(item["id"] == "github_proxy_web" for item in resp.data)) def test_nettest_blocks_unknown_target(self): class FailIfCalled: def __init__(self, *args, **kwargs): raise AssertionError("nettest should reject unknown targets before any outbound request") with patch.object(system_endpoint, "AsyncRequestUtils", FailIfCalled): resp = asyncio.run( system_endpoint.nettest( target_id="unknown-target", _="token", ) ) self.assertFalse(resp.success) self.assertIn("不存在", resp.message) def test_nettest_blocks_unapproved_redirect(self): captured = {"calls": 0} class FakeResponse: def __init__(self, status_code, headers=None, text=""): self.status_code = status_code self.headers = headers or {} self.text = text async def aclose(self): return None class FakeAsyncRequestUtils: def __init__(self, **kwargs): captured["init_kwargs"] = kwargs async def get_res(self, url, allow_redirects=True): captured["calls"] += 1 return FakeResponse( 302, headers={"location": "https://169.254.169.254/latest/meta-data/"}, ) with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( system_endpoint.settings, "GITHUB_PROXY", "https://ghproxy.example/", ): resp = asyncio.run( system_endpoint.nettest( target_id="github_proxy_web", _="token", ) ) self.assertFalse(resp.success) self.assertIn("跳转", resp.message) self.assertEqual(captured["calls"], 1) def test_nettest_allows_known_external_redirects(self): cases = { "telegram_api": "https://core.telegram.org/bots", "douban_api": "https://www.douban.com/doubanapp/frodo?wechat=0&os=Other", "github_codeload": "https://github.com/", } for target_id, redirect_url in cases.items(): call_urls = [] class FakeResponse: def __init__(self, status_code, headers=None, text=""): self.status_code = status_code self.headers = headers or {} self.text = text async def aclose(self): return None class FakeAsyncRequestUtils: def __init__(self, **kwargs): pass async def get_res(self, url, allow_redirects=True): call_urls.append(url) if len(call_urls) == 1: return FakeResponse(302, headers={"location": redirect_url}) return FakeResponse(200, text="ok") with self.subTest(target_id=target_id), patch.object( system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils, ): resp = asyncio.run( system_endpoint.nettest( target_id=target_id, _="token", ) ) self.assertTrue(resp.success) self.assertEqual(len(call_urls), 2) def test_nettest_uses_safe_http_options_and_server_side_content_check(self): captured = {} class FakeAsyncRequestUtils: def __init__(self, **kwargs): captured["init_kwargs"] = kwargs async def get_res(self, url, allow_redirects=True): captured["url"] = url captured["allow_redirects"] = allow_redirects return SimpleNamespace(status_code=200, text="MoviePilot README") with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( system_endpoint.settings, "GITHUB_PROXY", "https://ghproxy.example/", ): resp = asyncio.run( system_endpoint.nettest( target_id="github_proxy_web", include="tag_name", _="token", ) ) self.assertTrue(resp.success) self.assertEqual( captured["url"], "https://ghproxy.example/https://github.com/jxxghp/MoviePilot/blob/v2/README.md", ) self.assertFalse(captured["allow_redirects"]) self.assertTrue(captured["init_kwargs"]["verify"]) self.assertFalse(captured["init_kwargs"]["follow_redirects"]) def test_nettest_fails_when_expected_content_is_missing(self): class FakeAsyncRequestUtils: def __init__(self, **kwargs): pass async def get_res(self, url, allow_redirects=True): return SimpleNamespace(status_code=200, text="proxy landing page") with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( system_endpoint.settings, "PIP_PROXY", "https://pypi.tuna.tsinghua.edu.cn/simple/", ): resp = asyncio.run( system_endpoint.nettest( target_id="pip_proxy", _="token", ) ) self.assertFalse(resp.success) self.assertIn("PIP加速代理", resp.message) if __name__ == "__main__": unittest.main()