mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-03-20 03:46:40 +08:00
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,11 @@ router = APIRouter(prefix="/config", tags=["config"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SENSITIVE_KEYS = ("password", "api_key", "token", "secret")
|
||||
_MASK = "********"
|
||||
|
||||
|
||||
def _is_sensitive(key: str) -> bool:
|
||||
return any(s in key.lower() for s in _SENSITIVE_KEYS)
|
||||
|
||||
|
||||
def _sanitize_dict(d: dict) -> dict:
|
||||
@@ -19,36 +24,34 @@ def _sanitize_dict(d: dict) -> dict:
|
||||
for k, v in d.items():
|
||||
if isinstance(v, dict):
|
||||
result[k] = _sanitize_dict(v)
|
||||
elif isinstance(v, str) and any(s in k.lower() for s in _SENSITIVE_KEYS):
|
||||
result[k] = "********"
|
||||
elif isinstance(v, list):
|
||||
result[k] = [
|
||||
_sanitize_dict(item) if isinstance(item, dict) else item for item in v
|
||||
]
|
||||
elif isinstance(v, str) and _is_sensitive(k):
|
||||
result[k] = _MASK
|
||||
else:
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
|
||||
def _restore_sensitive(incoming: dict, current: dict) -> dict:
|
||||
"""Replace masked '********' values with the real values from current config.
|
||||
|
||||
When the frontend submits a config update it sends back the masked
|
||||
placeholder for every sensitive field (password, token, …). Saving that
|
||||
placeholder verbatim would overwrite the real credential with the literal
|
||||
string '********'. This function walks the incoming dict and, wherever it
|
||||
finds the placeholder, substitutes the value that is already stored in the
|
||||
running settings.
|
||||
"""
|
||||
result = {}
|
||||
def _restore_masked(incoming: dict, current: dict) -> dict:
|
||||
"""Replace masked sentinel values with real values from current config."""
|
||||
for k, v in incoming.items():
|
||||
if isinstance(v, dict):
|
||||
result[k] = _restore_sensitive(v, current.get(k, {}))
|
||||
elif (
|
||||
isinstance(v, str)
|
||||
and v == "********"
|
||||
and any(s in k.lower() for s in _SENSITIVE_KEYS)
|
||||
):
|
||||
result[k] = current.get(k, v)
|
||||
else:
|
||||
result[k] = v
|
||||
return result
|
||||
if isinstance(v, dict) and isinstance(current.get(k), dict):
|
||||
_restore_masked(v, current[k])
|
||||
elif isinstance(v, list) and isinstance(current.get(k), list):
|
||||
cur_list = current[k]
|
||||
for i, item in enumerate(v):
|
||||
if (
|
||||
isinstance(item, dict)
|
||||
and i < len(cur_list)
|
||||
and isinstance(cur_list[i], dict)
|
||||
):
|
||||
_restore_masked(item, cur_list[i])
|
||||
elif v == _MASK and _is_sensitive(k):
|
||||
incoming[k] = current.get(k, v)
|
||||
return incoming
|
||||
|
||||
|
||||
@router.get("/get", dependencies=[Depends(get_current_user)])
|
||||
@@ -63,10 +66,8 @@ async def get_config():
|
||||
async def update_config(config: Config):
|
||||
"""Persist and reload configuration from the supplied payload."""
|
||||
try:
|
||||
incoming = config.dict()
|
||||
current = settings.dict()
|
||||
restored = _restore_sensitive(incoming, current)
|
||||
settings.save(config_dict=restored)
|
||||
config_dict = _restore_masked(config.dict(), settings.dict())
|
||||
settings.save(config_dict=config_dict)
|
||||
settings.load()
|
||||
# update_rss()
|
||||
logger.info("Config updated")
|
||||
|
||||
@@ -132,7 +132,6 @@ async def test_downloader(req: TestDownloaderRequest):
|
||||
|
||||
scheme = "https" if req.ssl else "http"
|
||||
host = req.host if "://" in req.host else f"{scheme}://{req.host}"
|
||||
_validate_url(host)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
|
||||
@@ -7,7 +7,7 @@ from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from module.api import v1
|
||||
from module.api.config import _restore_sensitive, _sanitize_dict
|
||||
from module.api.config import _sanitize_dict, _restore_masked
|
||||
from module.models.config import Config
|
||||
from module.security.api import get_current_user
|
||||
|
||||
@@ -306,24 +306,20 @@ class TestSanitizeDict:
|
||||
|
||||
def test_nested_dict_recursed(self):
|
||||
"""Nested dicts are processed recursively."""
|
||||
result = _sanitize_dict({
|
||||
"downloader": {
|
||||
"host": "localhost",
|
||||
"password": "secret",
|
||||
result = _sanitize_dict(
|
||||
{
|
||||
"downloader": {
|
||||
"host": "localhost",
|
||||
"password": "secret",
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
assert result["downloader"]["host"] == "localhost"
|
||||
assert result["downloader"]["password"] == "********"
|
||||
|
||||
def test_deeply_nested_dict(self):
|
||||
"""Deeply nested sensitive keys are masked."""
|
||||
result = _sanitize_dict({
|
||||
"level1": {
|
||||
"level2": {
|
||||
"api_key": "deep-secret"
|
||||
}
|
||||
}
|
||||
})
|
||||
result = _sanitize_dict({"level1": {"level2": {"api_key": "deep-secret"}}})
|
||||
assert result["level1"]["level2"]["api_key"] == "********"
|
||||
|
||||
def test_non_string_value_not_masked(self):
|
||||
@@ -338,17 +334,33 @@ class TestSanitizeDict:
|
||||
|
||||
def test_mixed_sensitive_and_plain(self):
|
||||
"""Mix of sensitive and plain keys handled correctly."""
|
||||
result = _sanitize_dict({
|
||||
"username": "admin",
|
||||
"password": "secret",
|
||||
"host": "10.0.0.1",
|
||||
"token": "jwt-abc",
|
||||
})
|
||||
result = _sanitize_dict(
|
||||
{
|
||||
"username": "admin",
|
||||
"password": "secret",
|
||||
"host": "10.0.0.1",
|
||||
"token": "jwt-abc",
|
||||
}
|
||||
)
|
||||
assert result["username"] == "admin"
|
||||
assert result["host"] == "10.0.0.1"
|
||||
assert result["password"] == "********"
|
||||
assert result["token"] == "********"
|
||||
|
||||
def test_sanitize_list_of_dicts(self):
|
||||
"""Lists containing dicts are recursed into."""
|
||||
result = _sanitize_dict(
|
||||
{
|
||||
"providers": [
|
||||
{"type": "telegram", "token": "secret-token"},
|
||||
{"type": "bark", "token": "another-secret"},
|
||||
]
|
||||
}
|
||||
)
|
||||
assert result["providers"][0]["token"] == "********"
|
||||
assert result["providers"][1]["token"] == "********"
|
||||
assert result["providers"][0]["type"] == "telegram"
|
||||
|
||||
def test_get_config_masks_sensitive_fields(self, authed_client):
|
||||
"""GET /config/get response masks password and api_key fields."""
|
||||
test_config = Config()
|
||||
@@ -363,54 +375,107 @@ class TestSanitizeDict:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _restore_sensitive unit tests
|
||||
# _restore_masked unit tests (#995)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRestoreSensitive:
|
||||
def test_restores_masked_password(self):
|
||||
"""Masked password is replaced with the real value from current config."""
|
||||
class TestRestoreMasked:
|
||||
"""Issue #995: Masked passwords must not overwrite real credentials."""
|
||||
|
||||
def test_masked_password_restored(self):
|
||||
"""Masked password is replaced with the real stored value."""
|
||||
incoming = {"password": "********"}
|
||||
current = {"password": "realpassword"}
|
||||
result = _restore_sensitive(incoming, current)
|
||||
assert result["password"] == "realpassword"
|
||||
current = {"password": "real_secret"}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["password"] == "real_secret"
|
||||
|
||||
def test_non_masked_password_kept(self):
|
||||
"""A newly supplied password (not the placeholder) is kept as-is."""
|
||||
incoming = {"password": "newpassword"}
|
||||
current = {"password": "oldpassword"}
|
||||
result = _restore_sensitive(incoming, current)
|
||||
assert result["password"] == "newpassword"
|
||||
def test_new_password_preserved(self):
|
||||
"""Non-masked password value is kept as-is."""
|
||||
incoming = {"password": "new_password"}
|
||||
current = {"password": "old_password"}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["password"] == "new_password"
|
||||
|
||||
def test_non_sensitive_key_passed_through(self):
|
||||
"""Non-sensitive keys are never touched."""
|
||||
incoming = {"host": "192.168.1.1", "ssl": False}
|
||||
current = {"host": "10.0.0.1", "ssl": True}
|
||||
result = _restore_sensitive(incoming, current)
|
||||
assert result["host"] == "192.168.1.1"
|
||||
assert result["ssl"] is False
|
||||
def test_nested_masked_password_restored(self):
|
||||
"""Masked password inside nested dict is restored."""
|
||||
incoming = {"downloader": {"host": "10.0.0.1", "password": "********"}}
|
||||
current = {"downloader": {"host": "10.0.0.1", "password": "adminadmin"}}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["downloader"]["password"] == "adminadmin"
|
||||
|
||||
def test_nested_dict_restored(self):
|
||||
"""Masked values inside nested dicts are restored recursively."""
|
||||
incoming = {"downloader": {"host": "192.168.1.1", "password": "********"}}
|
||||
current = {"downloader": {"host": "10.0.0.1", "password": "realpassword"}}
|
||||
result = _restore_sensitive(incoming, current)
|
||||
assert result["downloader"]["host"] == "192.168.1.1"
|
||||
assert result["downloader"]["password"] == "realpassword"
|
||||
def test_nested_new_password_preserved(self):
|
||||
"""Non-masked password inside nested dict is kept."""
|
||||
incoming = {"downloader": {"password": "changed"}}
|
||||
current = {"downloader": {"password": "old"}}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["downloader"]["password"] == "changed"
|
||||
|
||||
def test_missing_key_in_current_keeps_placeholder(self):
|
||||
"""If a sensitive key has no counterpart in current, keep the incoming value."""
|
||||
incoming = {"password": "********"}
|
||||
current = {}
|
||||
result = _restore_sensitive(incoming, current)
|
||||
assert result["password"] == "********"
|
||||
def test_multiple_sensitive_fields(self):
|
||||
"""All sensitive fields are handled independently."""
|
||||
incoming = {
|
||||
"downloader": {"password": "********"},
|
||||
"proxy": {"password": "new_proxy_pass"},
|
||||
"experimental_openai": {"api_key": "********"},
|
||||
}
|
||||
current = {
|
||||
"downloader": {"password": "qb_pass"},
|
||||
"proxy": {"password": "old_proxy_pass"},
|
||||
"experimental_openai": {"api_key": "sk-real-key"},
|
||||
}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["downloader"]["password"] == "qb_pass"
|
||||
assert incoming["proxy"]["password"] == "new_proxy_pass"
|
||||
assert incoming["experimental_openai"]["api_key"] == "sk-real-key"
|
||||
|
||||
def test_update_config_preserves_password_when_masked(self, authed_client, mock_settings):
|
||||
"""PATCH /config/update must not overwrite a real password with '********'.
|
||||
def test_non_sensitive_mask_value_untouched(self):
|
||||
"""A non-sensitive key with '********' value is not modified."""
|
||||
incoming = {"host": "********"}
|
||||
current = {"host": "10.0.0.1"}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["host"] == "********"
|
||||
|
||||
Reproduces the bug where the frontend sends back the masked placeholder
|
||||
and the backend used to save it verbatim, corrupting the stored credential.
|
||||
"""
|
||||
def test_list_of_dicts_restored(self):
|
||||
"""Masked tokens inside list items are restored."""
|
||||
incoming = {
|
||||
"providers": [
|
||||
{"type": "telegram", "token": "********"},
|
||||
{"type": "bark", "token": "new-bark-token"},
|
||||
]
|
||||
}
|
||||
current = {
|
||||
"providers": [
|
||||
{"type": "telegram", "token": "real-tg-token"},
|
||||
{"type": "bark", "token": "old-bark-token"},
|
||||
]
|
||||
}
|
||||
_restore_masked(incoming, current)
|
||||
assert incoming["providers"][0]["token"] == "real-tg-token"
|
||||
assert incoming["providers"][1]["token"] == "new-bark-token"
|
||||
|
||||
def test_empty_dicts(self):
|
||||
"""Empty dicts don't cause errors."""
|
||||
_restore_masked({}, {})
|
||||
|
||||
def test_round_trip_preserves_credentials(self):
|
||||
"""Full round-trip: sanitize then restore recovers original values."""
|
||||
original = {
|
||||
"downloader": {"host": "10.0.0.1", "password": "secret123"},
|
||||
"experimental_openai": {"api_key": "sk-abc", "model": "gpt-4"},
|
||||
}
|
||||
sanitized = _sanitize_dict(original)
|
||||
assert sanitized["downloader"]["password"] == "********"
|
||||
assert sanitized["experimental_openai"]["api_key"] == "********"
|
||||
|
||||
_restore_masked(sanitized, original)
|
||||
assert sanitized["downloader"]["password"] == "secret123"
|
||||
assert sanitized["experimental_openai"]["api_key"] == "sk-abc"
|
||||
assert sanitized["downloader"]["host"] == "10.0.0.1"
|
||||
assert sanitized["experimental_openai"]["model"] == "gpt-4"
|
||||
|
||||
def test_update_config_preserves_password_when_masked(
|
||||
self, authed_client, mock_settings
|
||||
):
|
||||
"""PATCH /config/update must not overwrite a real password with '********'."""
|
||||
mock_settings.dict.return_value = {
|
||||
"program": {"rss_time": 900, "rename_time": 60, "webui_port": 7892},
|
||||
"downloader": {
|
||||
@@ -422,35 +487,85 @@ class TestRestoreSensitive:
|
||||
"ssl": True,
|
||||
},
|
||||
"rss_parser": {"enable": True, "filter": [], "language": "zh"},
|
||||
"bangumi_manage": {"enable": True, "eps_complete": False, "rename_method": "pn", "group_tag": False, "remove_bad_torrent": False},
|
||||
"bangumi_manage": {
|
||||
"enable": True,
|
||||
"eps_complete": False,
|
||||
"rename_method": "pn",
|
||||
"group_tag": False,
|
||||
"remove_bad_torrent": False,
|
||||
},
|
||||
"log": {"debug_enable": False},
|
||||
"proxy": {"enable": False, "type": "http", "host": "", "port": 0, "username": "", "password": ""},
|
||||
"notification": {"enable": False, "type": "telegram", "token": "", "chat_id": ""},
|
||||
"experimental_openai": {"enable": False, "api_key": "", "api_base": "https://api.openai.com/v1", "api_type": "openai", "api_version": "2023-05-15", "model": "gpt-3.5-turbo", "deployment_id": ""},
|
||||
"proxy": {
|
||||
"enable": False,
|
||||
"type": "http",
|
||||
"host": "",
|
||||
"port": 0,
|
||||
"username": "",
|
||||
"password": "",
|
||||
},
|
||||
"notification": {
|
||||
"enable": False,
|
||||
"type": "telegram",
|
||||
"token": "",
|
||||
"chat_id": "",
|
||||
},
|
||||
"experimental_openai": {
|
||||
"enable": False,
|
||||
"api_key": "",
|
||||
"api_base": "https://api.openai.com/v1",
|
||||
"api_type": "openai",
|
||||
"api_version": "2023-05-15",
|
||||
"model": "gpt-3.5-turbo",
|
||||
"deployment_id": "",
|
||||
},
|
||||
}
|
||||
# Frontend sends back masked password and ssl changed to False
|
||||
payload = {
|
||||
"program": {"rss_time": 900, "rename_time": 60, "webui_port": 7892},
|
||||
"downloader": {
|
||||
"type": "qbittorrent",
|
||||
"host": "192.168.1.1:8080",
|
||||
"username": "admin",
|
||||
"password": "********", # <-- masked placeholder from frontend
|
||||
"password": "********",
|
||||
"path": "/downloads",
|
||||
"ssl": False, # <-- the actual change the user made
|
||||
"ssl": False,
|
||||
},
|
||||
"rss_parser": {"enable": True, "filter": [], "language": "zh"},
|
||||
"bangumi_manage": {"enable": True, "eps_complete": False, "rename_method": "pn", "group_tag": False, "remove_bad_torrent": False},
|
||||
"bangumi_manage": {
|
||||
"enable": True,
|
||||
"eps_complete": False,
|
||||
"rename_method": "pn",
|
||||
"group_tag": False,
|
||||
"remove_bad_torrent": False,
|
||||
},
|
||||
"log": {"debug_enable": False},
|
||||
"proxy": {"enable": False, "type": "http", "host": "", "port": 0, "username": "", "password": ""},
|
||||
"notification": {"enable": False, "type": "telegram", "token": "", "chat_id": ""},
|
||||
"experimental_openai": {"enable": False, "api_key": "", "api_base": "https://api.openai.com/v1", "api_type": "openai", "api_version": "2023-05-15", "model": "gpt-3.5-turbo", "deployment_id": ""},
|
||||
"proxy": {
|
||||
"enable": False,
|
||||
"type": "http",
|
||||
"host": "",
|
||||
"port": 0,
|
||||
"username": "",
|
||||
"password": "",
|
||||
},
|
||||
"notification": {
|
||||
"enable": False,
|
||||
"type": "telegram",
|
||||
"token": "",
|
||||
"chat_id": "",
|
||||
},
|
||||
"experimental_openai": {
|
||||
"enable": False,
|
||||
"api_key": "",
|
||||
"api_base": "https://api.openai.com/v1",
|
||||
"api_type": "openai",
|
||||
"api_version": "2023-05-15",
|
||||
"model": "gpt-3.5-turbo",
|
||||
"deployment_id": "",
|
||||
},
|
||||
}
|
||||
with patch("module.api.config.settings", mock_settings):
|
||||
response = authed_client.patch("/api/v1/config/update", json=payload)
|
||||
|
||||
assert response.status_code == 200
|
||||
# The dict passed to save() must have the real password, not '********'
|
||||
saved = mock_settings.save.call_args[1]["config_dict"]
|
||||
assert saved["downloader"]["password"] == "realpassword"
|
||||
assert saved["downloader"]["ssl"] is False
|
||||
|
||||
@@ -124,57 +124,109 @@ class TestSetupGuard:
|
||||
|
||||
|
||||
class TestTestDownloader:
|
||||
def test_private_ip_accepted(self, client, mock_first_run):
|
||||
"""Issue #1001: Private IPs must not be rejected for downloader test."""
|
||||
import httpx
|
||||
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "192.168.1.100:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
# Should reach the connection attempt, not get blocked by IP validation
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
assert "connect" in data["message_en"].lower()
|
||||
|
||||
def test_loopback_ip_accepted(self, client, mock_first_run):
|
||||
"""Issue #1001: Loopback IPs must not be rejected for downloader test."""
|
||||
import httpx
|
||||
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "127.0.0.1:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
|
||||
def test_connection_timeout(self, client, mock_first_run):
|
||||
import httpx
|
||||
|
||||
with patch("module.api.setup._validate_url"):
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.TimeoutException("timeout")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.TimeoutException("timeout")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "localhost:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "localhost:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
|
||||
def test_connection_refused(self, client, mock_first_run):
|
||||
import httpx
|
||||
|
||||
with patch("module.api.setup._validate_url"):
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
with patch("module.api.setup.httpx.AsyncClient") as mock_client_cls:
|
||||
mock_instance = AsyncMock()
|
||||
mock_instance.get.side_effect = httpx.ConnectError("refused")
|
||||
mock_client_cls.return_value.__aenter__ = AsyncMock(
|
||||
return_value=mock_instance
|
||||
)
|
||||
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "localhost:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
response = client.post(
|
||||
"/api/v1/setup/test-downloader",
|
||||
json={
|
||||
"type": "qbittorrent",
|
||||
"host": "localhost:8080",
|
||||
"username": "admin",
|
||||
"password": "admin",
|
||||
"ssl": False,
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["success"] is False
|
||||
|
||||
|
||||
class TestTestRSS:
|
||||
|
||||
Reference in New Issue
Block a user