mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-03-20 03:46:40 +08:00
@@ -11,6 +11,11 @@ router = APIRouter(prefix="/config", tags=["config"])
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
_SENSITIVE_KEYS = ("password", "api_key", "token", "secret")
|
_SENSITIVE_KEYS = ("password", "api_key", "token", "secret")
|
||||||
|
_MASK = "********"
|
||||||
|
|
||||||
|
|
||||||
|
def _is_sensitive(key: str) -> bool:
|
||||||
|
return any(s in key.lower() for s in _SENSITIVE_KEYS)
|
||||||
|
|
||||||
|
|
||||||
def _sanitize_dict(d: dict) -> dict:
|
def _sanitize_dict(d: dict) -> dict:
|
||||||
@@ -19,13 +24,36 @@ def _sanitize_dict(d: dict) -> dict:
|
|||||||
for k, v in d.items():
|
for k, v in d.items():
|
||||||
if isinstance(v, dict):
|
if isinstance(v, dict):
|
||||||
result[k] = _sanitize_dict(v)
|
result[k] = _sanitize_dict(v)
|
||||||
elif isinstance(v, str) and any(s in k.lower() for s in _SENSITIVE_KEYS):
|
elif isinstance(v, list):
|
||||||
result[k] = "********"
|
result[k] = [
|
||||||
|
_sanitize_dict(item) if isinstance(item, dict) else item for item in v
|
||||||
|
]
|
||||||
|
elif isinstance(v, str) and _is_sensitive(k):
|
||||||
|
result[k] = _MASK
|
||||||
else:
|
else:
|
||||||
result[k] = v
|
result[k] = v
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def _restore_masked(incoming: dict, current: dict) -> dict:
|
||||||
|
"""Replace masked sentinel values with real values from current config."""
|
||||||
|
for k, v in incoming.items():
|
||||||
|
if isinstance(v, dict) and isinstance(current.get(k), dict):
|
||||||
|
_restore_masked(v, current[k])
|
||||||
|
elif isinstance(v, list) and isinstance(current.get(k), list):
|
||||||
|
cur_list = current[k]
|
||||||
|
for i, item in enumerate(v):
|
||||||
|
if (
|
||||||
|
isinstance(item, dict)
|
||||||
|
and i < len(cur_list)
|
||||||
|
and isinstance(cur_list[i], dict)
|
||||||
|
):
|
||||||
|
_restore_masked(item, cur_list[i])
|
||||||
|
elif v == _MASK and _is_sensitive(k):
|
||||||
|
incoming[k] = current.get(k, v)
|
||||||
|
return incoming
|
||||||
|
|
||||||
|
|
||||||
@router.get("/get", dependencies=[Depends(get_current_user)])
|
@router.get("/get", dependencies=[Depends(get_current_user)])
|
||||||
async def get_config():
|
async def get_config():
|
||||||
"""Return the current configuration with sensitive fields masked."""
|
"""Return the current configuration with sensitive fields masked."""
|
||||||
@@ -38,7 +66,8 @@ async def get_config():
|
|||||||
async def update_config(config: Config):
|
async def update_config(config: Config):
|
||||||
"""Persist and reload configuration from the supplied payload."""
|
"""Persist and reload configuration from the supplied payload."""
|
||||||
try:
|
try:
|
||||||
settings.save(config_dict=config.dict())
|
config_dict = _restore_masked(config.dict(), settings.dict())
|
||||||
|
settings.save(config_dict=config_dict)
|
||||||
settings.load()
|
settings.load()
|
||||||
# update_rss()
|
# update_rss()
|
||||||
logger.info("Config updated")
|
logger.info("Config updated")
|
||||||
|
|||||||
@@ -132,7 +132,6 @@ async def test_downloader(req: TestDownloaderRequest):
|
|||||||
|
|
||||||
scheme = "https" if req.ssl else "http"
|
scheme = "https" if req.ssl else "http"
|
||||||
host = req.host if "://" in req.host else f"{scheme}://{req.host}"
|
host = req.host if "://" in req.host else f"{scheme}://{req.host}"
|
||||||
_validate_url(host)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from mcp.server import Server
|
|||||||
from mcp.server.sse import SseServerTransport
|
from mcp.server.sse import SseServerTransport
|
||||||
from starlette.applications import Starlette
|
from starlette.applications import Starlette
|
||||||
from starlette.requests import Request
|
from starlette.requests import Request
|
||||||
|
from starlette.responses import Response
|
||||||
from starlette.routing import Mount, Route
|
from starlette.routing import Mount, Route
|
||||||
|
|
||||||
from .resources import RESOURCE_TEMPLATES, RESOURCES, handle_resource
|
from .resources import RESOURCE_TEMPLATES, RESOURCES, handle_resource
|
||||||
@@ -64,6 +65,7 @@ async def handle_sse(request: Request):
|
|||||||
streams[1],
|
streams[1],
|
||||||
server.create_initialization_options(),
|
server.create_initialization_options(),
|
||||||
)
|
)
|
||||||
|
return Response()
|
||||||
|
|
||||||
|
|
||||||
def create_mcp_starlette_app() -> Starlette:
|
def create_mcp_starlette_app() -> Starlette:
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ class RSSParser(BaseModel):
|
|||||||
"""RSS feed parsing settings."""
|
"""RSS feed parsing settings."""
|
||||||
|
|
||||||
enable: bool = Field(True, description="Enable RSS parser")
|
enable: bool = Field(True, description="Enable RSS parser")
|
||||||
filter: list[str] = Field(["720", r"\d+-\d"], description="Filter")
|
filter: list[str] = Field(["720", r"\d+-\d+"], description="Filter")
|
||||||
language: str = "zh"
|
language: str = "zh"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from fastapi import FastAPI
|
|||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
from module.api import v1
|
from module.api import v1
|
||||||
from module.api.config import _sanitize_dict
|
from module.api.config import _sanitize_dict, _restore_masked
|
||||||
from module.models.config import Config
|
from module.models.config import Config
|
||||||
from module.security.api import get_current_user
|
from module.security.api import get_current_user
|
||||||
|
|
||||||
@@ -306,24 +306,20 @@ class TestSanitizeDict:
|
|||||||
|
|
||||||
def test_nested_dict_recursed(self):
|
def test_nested_dict_recursed(self):
|
||||||
"""Nested dicts are processed recursively."""
|
"""Nested dicts are processed recursively."""
|
||||||
result = _sanitize_dict({
|
result = _sanitize_dict(
|
||||||
"downloader": {
|
{
|
||||||
"host": "localhost",
|
"downloader": {
|
||||||
"password": "secret",
|
"host": "localhost",
|
||||||
|
"password": "secret",
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
)
|
||||||
assert result["downloader"]["host"] == "localhost"
|
assert result["downloader"]["host"] == "localhost"
|
||||||
assert result["downloader"]["password"] == "********"
|
assert result["downloader"]["password"] == "********"
|
||||||
|
|
||||||
def test_deeply_nested_dict(self):
|
def test_deeply_nested_dict(self):
|
||||||
"""Deeply nested sensitive keys are masked."""
|
"""Deeply nested sensitive keys are masked."""
|
||||||
result = _sanitize_dict({
|
result = _sanitize_dict({"level1": {"level2": {"api_key": "deep-secret"}}})
|
||||||
"level1": {
|
|
||||||
"level2": {
|
|
||||||
"api_key": "deep-secret"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
assert result["level1"]["level2"]["api_key"] == "********"
|
assert result["level1"]["level2"]["api_key"] == "********"
|
||||||
|
|
||||||
def test_non_string_value_not_masked(self):
|
def test_non_string_value_not_masked(self):
|
||||||
@@ -338,17 +334,33 @@ class TestSanitizeDict:
|
|||||||
|
|
||||||
def test_mixed_sensitive_and_plain(self):
|
def test_mixed_sensitive_and_plain(self):
|
||||||
"""Mix of sensitive and plain keys handled correctly."""
|
"""Mix of sensitive and plain keys handled correctly."""
|
||||||
result = _sanitize_dict({
|
result = _sanitize_dict(
|
||||||
"username": "admin",
|
{
|
||||||
"password": "secret",
|
"username": "admin",
|
||||||
"host": "10.0.0.1",
|
"password": "secret",
|
||||||
"token": "jwt-abc",
|
"host": "10.0.0.1",
|
||||||
})
|
"token": "jwt-abc",
|
||||||
|
}
|
||||||
|
)
|
||||||
assert result["username"] == "admin"
|
assert result["username"] == "admin"
|
||||||
assert result["host"] == "10.0.0.1"
|
assert result["host"] == "10.0.0.1"
|
||||||
assert result["password"] == "********"
|
assert result["password"] == "********"
|
||||||
assert result["token"] == "********"
|
assert result["token"] == "********"
|
||||||
|
|
||||||
|
def test_sanitize_list_of_dicts(self):
|
||||||
|
"""Lists containing dicts are recursed into."""
|
||||||
|
result = _sanitize_dict(
|
||||||
|
{
|
||||||
|
"providers": [
|
||||||
|
{"type": "telegram", "token": "secret-token"},
|
||||||
|
{"type": "bark", "token": "another-secret"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert result["providers"][0]["token"] == "********"
|
||||||
|
assert result["providers"][1]["token"] == "********"
|
||||||
|
assert result["providers"][0]["type"] == "telegram"
|
||||||
|
|
||||||
def test_get_config_masks_sensitive_fields(self, authed_client):
|
def test_get_config_masks_sensitive_fields(self, authed_client):
|
||||||
"""GET /config/get response masks password and api_key fields."""
|
"""GET /config/get response masks password and api_key fields."""
|
||||||
test_config = Config()
|
test_config = Config()
|
||||||
@@ -360,3 +372,200 @@ class TestSanitizeDict:
|
|||||||
assert data["downloader"]["password"] == "********"
|
assert data["downloader"]["password"] == "********"
|
||||||
# OpenAI api_key should be masked (it's an empty string but still masked)
|
# OpenAI api_key should be masked (it's an empty string but still masked)
|
||||||
assert data["experimental_openai"]["api_key"] == "********"
|
assert data["experimental_openai"]["api_key"] == "********"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _restore_masked unit tests (#995)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestRestoreMasked:
|
||||||
|
"""Issue #995: Masked passwords must not overwrite real credentials."""
|
||||||
|
|
||||||
|
def test_masked_password_restored(self):
|
||||||
|
"""Masked password is replaced with the real stored value."""
|
||||||
|
incoming = {"password": "********"}
|
||||||
|
current = {"password": "real_secret"}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["password"] == "real_secret"
|
||||||
|
|
||||||
|
def test_new_password_preserved(self):
|
||||||
|
"""Non-masked password value is kept as-is."""
|
||||||
|
incoming = {"password": "new_password"}
|
||||||
|
current = {"password": "old_password"}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["password"] == "new_password"
|
||||||
|
|
||||||
|
def test_nested_masked_password_restored(self):
|
||||||
|
"""Masked password inside nested dict is restored."""
|
||||||
|
incoming = {"downloader": {"host": "10.0.0.1", "password": "********"}}
|
||||||
|
current = {"downloader": {"host": "10.0.0.1", "password": "adminadmin"}}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["downloader"]["password"] == "adminadmin"
|
||||||
|
|
||||||
|
def test_nested_new_password_preserved(self):
|
||||||
|
"""Non-masked password inside nested dict is kept."""
|
||||||
|
incoming = {"downloader": {"password": "changed"}}
|
||||||
|
current = {"downloader": {"password": "old"}}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["downloader"]["password"] == "changed"
|
||||||
|
|
||||||
|
def test_multiple_sensitive_fields(self):
|
||||||
|
"""All sensitive fields are handled independently."""
|
||||||
|
incoming = {
|
||||||
|
"downloader": {"password": "********"},
|
||||||
|
"proxy": {"password": "new_proxy_pass"},
|
||||||
|
"experimental_openai": {"api_key": "********"},
|
||||||
|
}
|
||||||
|
current = {
|
||||||
|
"downloader": {"password": "qb_pass"},
|
||||||
|
"proxy": {"password": "old_proxy_pass"},
|
||||||
|
"experimental_openai": {"api_key": "sk-real-key"},
|
||||||
|
}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["downloader"]["password"] == "qb_pass"
|
||||||
|
assert incoming["proxy"]["password"] == "new_proxy_pass"
|
||||||
|
assert incoming["experimental_openai"]["api_key"] == "sk-real-key"
|
||||||
|
|
||||||
|
def test_non_sensitive_mask_value_untouched(self):
|
||||||
|
"""A non-sensitive key with '********' value is not modified."""
|
||||||
|
incoming = {"host": "********"}
|
||||||
|
current = {"host": "10.0.0.1"}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["host"] == "********"
|
||||||
|
|
||||||
|
def test_list_of_dicts_restored(self):
|
||||||
|
"""Masked tokens inside list items are restored."""
|
||||||
|
incoming = {
|
||||||
|
"providers": [
|
||||||
|
{"type": "telegram", "token": "********"},
|
||||||
|
{"type": "bark", "token": "new-bark-token"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
current = {
|
||||||
|
"providers": [
|
||||||
|
{"type": "telegram", "token": "real-tg-token"},
|
||||||
|
{"type": "bark", "token": "old-bark-token"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
_restore_masked(incoming, current)
|
||||||
|
assert incoming["providers"][0]["token"] == "real-tg-token"
|
||||||
|
assert incoming["providers"][1]["token"] == "new-bark-token"
|
||||||
|
|
||||||
|
def test_empty_dicts(self):
|
||||||
|
"""Empty dicts don't cause errors."""
|
||||||
|
_restore_masked({}, {})
|
||||||
|
|
||||||
|
def test_round_trip_preserves_credentials(self):
|
||||||
|
"""Full round-trip: sanitize then restore recovers original values."""
|
||||||
|
original = {
|
||||||
|
"downloader": {"host": "10.0.0.1", "password": "secret123"},
|
||||||
|
"experimental_openai": {"api_key": "sk-abc", "model": "gpt-4"},
|
||||||
|
}
|
||||||
|
sanitized = _sanitize_dict(original)
|
||||||
|
assert sanitized["downloader"]["password"] == "********"
|
||||||
|
assert sanitized["experimental_openai"]["api_key"] == "********"
|
||||||
|
|
||||||
|
_restore_masked(sanitized, original)
|
||||||
|
assert sanitized["downloader"]["password"] == "secret123"
|
||||||
|
assert sanitized["experimental_openai"]["api_key"] == "sk-abc"
|
||||||
|
assert sanitized["downloader"]["host"] == "10.0.0.1"
|
||||||
|
assert sanitized["experimental_openai"]["model"] == "gpt-4"
|
||||||
|
|
||||||
|
def test_update_config_preserves_password_when_masked(
|
||||||
|
self, authed_client, mock_settings
|
||||||
|
):
|
||||||
|
"""PATCH /config/update must not overwrite a real password with '********'."""
|
||||||
|
mock_settings.dict.return_value = {
|
||||||
|
"program": {"rss_time": 900, "rename_time": 60, "webui_port": 7892},
|
||||||
|
"downloader": {
|
||||||
|
"type": "qbittorrent",
|
||||||
|
"host": "192.168.1.1:8080",
|
||||||
|
"username": "admin",
|
||||||
|
"password": "realpassword",
|
||||||
|
"path": "/downloads",
|
||||||
|
"ssl": True,
|
||||||
|
},
|
||||||
|
"rss_parser": {"enable": True, "filter": [], "language": "zh"},
|
||||||
|
"bangumi_manage": {
|
||||||
|
"enable": True,
|
||||||
|
"eps_complete": False,
|
||||||
|
"rename_method": "pn",
|
||||||
|
"group_tag": False,
|
||||||
|
"remove_bad_torrent": False,
|
||||||
|
},
|
||||||
|
"log": {"debug_enable": False},
|
||||||
|
"proxy": {
|
||||||
|
"enable": False,
|
||||||
|
"type": "http",
|
||||||
|
"host": "",
|
||||||
|
"port": 0,
|
||||||
|
"username": "",
|
||||||
|
"password": "",
|
||||||
|
},
|
||||||
|
"notification": {
|
||||||
|
"enable": False,
|
||||||
|
"type": "telegram",
|
||||||
|
"token": "",
|
||||||
|
"chat_id": "",
|
||||||
|
},
|
||||||
|
"experimental_openai": {
|
||||||
|
"enable": False,
|
||||||
|
"api_key": "",
|
||||||
|
"api_base": "https://api.openai.com/v1",
|
||||||
|
"api_type": "openai",
|
||||||
|
"api_version": "2023-05-15",
|
||||||
|
"model": "gpt-3.5-turbo",
|
||||||
|
"deployment_id": "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
payload = {
|
||||||
|
"program": {"rss_time": 900, "rename_time": 60, "webui_port": 7892},
|
||||||
|
"downloader": {
|
||||||
|
"type": "qbittorrent",
|
||||||
|
"host": "192.168.1.1:8080",
|
||||||
|
"username": "admin",
|
||||||
|
"password": "********",
|
||||||
|
"path": "/downloads",
|
||||||
|
"ssl": False,
|
||||||
|
},
|
||||||
|
"rss_parser": {"enable": True, "filter": [], "language": "zh"},
|
||||||
|
"bangumi_manage": {
|
||||||
|
"enable": True,
|
||||||
|
"eps_complete": False,
|
||||||
|
"rename_method": "pn",
|
||||||
|
"group_tag": False,
|
||||||
|
"remove_bad_torrent": False,
|
||||||
|
},
|
||||||
|
"log": {"debug_enable": False},
|
||||||
|
"proxy": {
|
||||||
|
"enable": False,
|
||||||
|
"type": "http",
|
||||||
|
"host": "",
|
||||||
|
"port": 0,
|
||||||
|
"username": "",
|
||||||
|
"password": "",
|
||||||
|
},
|
||||||
|
"notification": {
|
||||||
|
"enable": False,
|
||||||
|
"type": "telegram",
|
||||||
|
"token": "",
|
||||||
|
"chat_id": "",
|
||||||
|
},
|
||||||
|
"experimental_openai": {
|
||||||
|
"enable": False,
|
||||||
|
"api_key": "",
|
||||||
|
"api_base": "https://api.openai.com/v1",
|
||||||
|
"api_type": "openai",
|
||||||
|
"api_version": "2023-05-15",
|
||||||
|
"model": "gpt-3.5-turbo",
|
||||||
|
"deployment_id": "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
with patch("module.api.config.settings", mock_settings):
|
||||||
|
response = authed_client.patch("/api/v1/config/update", json=payload)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
saved = mock_settings.save.call_args[1]["config_dict"]
|
||||||
|
assert saved["downloader"]["password"] == "realpassword"
|
||||||
|
assert saved["downloader"]["ssl"] is False
|
||||||
|
|||||||
@@ -124,57 +124,109 @@ class TestSetupGuard:
|
|||||||
|
|
||||||
|
|
||||||
class TestTestDownloader:
|
class TestTestDownloader:
|
||||||
|
def test_private_ip_accepted(self, client, mock_first_run):
|
||||||
|
"""Issue #1001: Private IPs must not be rejected for downloader test."""
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_instance = AsyncMock()
|
||||||
|
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||||
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||||
|
return_value=mock_instance
|
||||||
|
)
|
||||||
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
"/api/v1/setup/test-downloader",
|
||||||
|
json={
|
||||||
|
"type": "qbittorrent",
|
||||||
|
"host": "192.168.1.100:8080",
|
||||||
|
"username": "admin",
|
||||||
|
"password": "admin",
|
||||||
|
"ssl": False,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
# Should reach the connection attempt, not get blocked by IP validation
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data["success"] is False
|
||||||
|
assert "connect" in data["message_en"].lower()
|
||||||
|
|
||||||
|
def test_loopback_ip_accepted(self, client, mock_first_run):
|
||||||
|
"""Issue #1001: Loopback IPs must not be rejected for downloader test."""
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||||
|
mock_instance = AsyncMock()
|
||||||
|
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||||
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||||
|
return_value=mock_instance
|
||||||
|
)
|
||||||
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
"/api/v1/setup/test-downloader",
|
||||||
|
json={
|
||||||
|
"type": "qbittorrent",
|
||||||
|
"host": "127.0.0.1:8080",
|
||||||
|
"username": "admin",
|
||||||
|
"password": "admin",
|
||||||
|
"ssl": False,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data["success"] is False
|
||||||
|
|
||||||
def test_connection_timeout(self, client, mock_first_run):
|
def test_connection_timeout(self, client, mock_first_run):
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
with patch("module.api.setup._validate_url"):
|
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
mock_instance = AsyncMock()
|
||||||
mock_instance = AsyncMock()
|
mock_instance.get.side_effect = httpx.TimeoutException("timeout")
|
||||||
mock_instance.get.side_effect = httpx.TimeoutException("timeout")
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
return_value=mock_instance
|
||||||
return_value=mock_instance
|
)
|
||||||
)
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
||||||
|
|
||||||
response = client.post(
|
response = client.post(
|
||||||
"/api/v1/setup/test-downloader",
|
"/api/v1/setup/test-downloader",
|
||||||
json={
|
json={
|
||||||
"type": "qbittorrent",
|
"type": "qbittorrent",
|
||||||
"host": "localhost:8080",
|
"host": "localhost:8080",
|
||||||
"username": "admin",
|
"username": "admin",
|
||||||
"password": "admin",
|
"password": "admin",
|
||||||
"ssl": False,
|
"ssl": False,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = response.json()
|
data = response.json()
|
||||||
assert data["success"] is False
|
assert data["success"] is False
|
||||||
|
|
||||||
def test_connection_refused(self, client, mock_first_run):
|
def test_connection_refused(self, client, mock_first_run):
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
with patch("module.api.setup._validate_url"):
|
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
mock_instance = AsyncMock()
|
||||||
mock_instance = AsyncMock()
|
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||||
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
return_value=mock_instance
|
||||||
return_value=mock_instance
|
)
|
||||||
)
|
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
|
||||||
|
|
||||||
response = client.post(
|
response = client.post(
|
||||||
"/api/v1/setup/test-downloader",
|
"/api/v1/setup/test-downloader",
|
||||||
json={
|
json={
|
||||||
"type": "qbittorrent",
|
"type": "qbittorrent",
|
||||||
"host": "localhost:8080",
|
"host": "localhost:8080",
|
||||||
"username": "admin",
|
"username": "admin",
|
||||||
"password": "admin",
|
"password": "admin",
|
||||||
"ssl": False,
|
"ssl": False,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = response.json()
|
data = response.json()
|
||||||
assert data["success"] is False
|
assert data["success"] is False
|
||||||
|
|
||||||
|
|
||||||
class TestTestRSS:
|
class TestTestRSS:
|
||||||
|
|||||||
Reference in New Issue
Block a user