mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
Merge branch 'jxxghp:v2' into v2
This commit is contained in:
@@ -1136,6 +1136,38 @@ meta_cases = [{
|
||||
"tmdbid": 27205,
|
||||
"fps": None
|
||||
}
|
||||
}, {
|
||||
"path": "/movies/Breaking Bad (2008) [tmdb=1396]/Season 2/",
|
||||
"target": {
|
||||
"type": "电视剧",
|
||||
"cn_name": "",
|
||||
"en_name": "Breaking Bad",
|
||||
"year": "2008",
|
||||
"part": "",
|
||||
"season": "S02",
|
||||
"episode": "",
|
||||
"restype": "",
|
||||
"pix": "",
|
||||
"video_codec": "",
|
||||
"audio_codec": "",
|
||||
"tmdbid": 1396
|
||||
}
|
||||
}, {
|
||||
"path": "/movies/Breaking Bad (2008) [tmdb=1396]/S2/",
|
||||
"target": {
|
||||
"type": "电视剧",
|
||||
"cn_name": "",
|
||||
"en_name": "Breaking Bad",
|
||||
"year": "2008",
|
||||
"part": "",
|
||||
"season": "S02",
|
||||
"episode": "",
|
||||
"restype": "",
|
||||
"pix": "",
|
||||
"video_codec": "",
|
||||
"audio_codec": "",
|
||||
"tmdbid": 1396
|
||||
}
|
||||
}, {
|
||||
"path": "/movies/Breaking Bad (2008) [tmdb=1396]/Season 1/Breaking.Bad.S01E01.1080p.mkv",
|
||||
"target": {
|
||||
|
||||
299
tests/manual/ugreen_media_cli.py
Normal file
299
tests/manual/ugreen_media_cli.py
Normal file
@@ -0,0 +1,299 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import getpass
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
from typing import Any, Mapping
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
# 兼容直接运行脚本:避免 app/utils 被放在 sys.path 首位导致标准库模块被同名文件遮蔽
|
||||
if __name__ == "__main__" and __package__ is None:
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
project_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
|
||||
if script_dir in sys.path:
|
||||
sys.path.remove(script_dir)
|
||||
if project_root not in sys.path:
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
import requests
|
||||
|
||||
from app.utils.ugreen_crypto import UgreenCrypto
|
||||
|
||||
|
||||
class UgreenLoginError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _normalize_base_url(raw: str) -> str:
|
||||
value = (raw or "").strip()
|
||||
if not value:
|
||||
raise UgreenLoginError("服务器地址不能为空")
|
||||
if not value.startswith(("http://", "https://")):
|
||||
value = f"http://{value}"
|
||||
parsed = urlsplit(value)
|
||||
if not parsed.netloc:
|
||||
raise UgreenLoginError(f"无效服务器地址: {raw}")
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
|
||||
|
||||
|
||||
def _json_or_raise(resp: requests.Response, stage: str) -> dict[str, Any]:
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception as exc: # pragma: no cover - 网络异常路径
|
||||
raise UgreenLoginError(
|
||||
f"{stage} 返回非 JSON,HTTP {resp.status_code},响应片段: {resp.text[:200]}"
|
||||
) from exc
|
||||
if not isinstance(data, dict):
|
||||
raise UgreenLoginError(f"{stage} 返回格式异常: {type(data).__name__}")
|
||||
return data
|
||||
|
||||
|
||||
def _decode_public_key(raw: str) -> str:
|
||||
value = (raw or "").strip()
|
||||
if not value:
|
||||
raise UgreenLoginError("未获取到公钥")
|
||||
if "BEGIN" in value:
|
||||
return value
|
||||
try:
|
||||
return base64.b64decode(value).decode("utf-8")
|
||||
except Exception as exc:
|
||||
raise UgreenLoginError("公钥解码失败") from exc
|
||||
|
||||
|
||||
def _raise_if_failed(payload: Mapping[str, Any], stage: str) -> None:
|
||||
if payload.get("code") == 200:
|
||||
return
|
||||
raise UgreenLoginError(
|
||||
f"{stage}失败: code={payload.get('code')} msg={payload.get('msg')}"
|
||||
)
|
||||
|
||||
|
||||
def _build_common_headers(
|
||||
client_id: str, client_version: str, language: str
|
||||
) -> dict[str, str]:
|
||||
return {
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"Client-Id": client_id,
|
||||
"Client-Version": client_version,
|
||||
"UG-Agent": "PC/WEB",
|
||||
"X-Specify-Language": language,
|
||||
}
|
||||
|
||||
|
||||
def _login_and_get_access(
|
||||
session: requests.Session,
|
||||
base_url: str,
|
||||
username: str,
|
||||
password: str,
|
||||
keepalive: bool,
|
||||
headers: Mapping[str, str],
|
||||
timeout: float,
|
||||
verify_ssl: bool,
|
||||
) -> tuple[str, str]:
|
||||
check_resp = session.post(
|
||||
f"{base_url}/ugreen/v1/verify/check",
|
||||
json={"username": username},
|
||||
headers=dict(headers),
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
check_json = _json_or_raise(check_resp, "获取登录公钥")
|
||||
_raise_if_failed(check_json, "获取登录公钥")
|
||||
|
||||
rsa_token = (
|
||||
check_resp.headers.get("x-rsa-token")
|
||||
or check_resp.headers.get("X-Rsa-Token")
|
||||
or check_json.get("xRsaToken")
|
||||
or check_json.get("x-rsa-token")
|
||||
)
|
||||
if not rsa_token:
|
||||
data = check_json.get("data")
|
||||
if isinstance(data, Mapping):
|
||||
rsa_token = data.get("xRsaToken") or data.get("x-rsa-token")
|
||||
if not rsa_token:
|
||||
raise UgreenLoginError("登录公钥为空(x-rsa-token)")
|
||||
|
||||
login_public_key = _decode_public_key(str(rsa_token))
|
||||
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(
|
||||
password
|
||||
)
|
||||
|
||||
login_payload = {
|
||||
"username": username,
|
||||
"password": encrypted_password,
|
||||
"keepalive": keepalive,
|
||||
"otp": True,
|
||||
"is_simple": True,
|
||||
}
|
||||
login_resp = session.post(
|
||||
f"{base_url}/ugreen/v1/verify/login",
|
||||
json=login_payload,
|
||||
headers=dict(headers),
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
login_json = _json_or_raise(login_resp, "登录")
|
||||
_raise_if_failed(login_json, "登录")
|
||||
|
||||
data = login_json.get("data")
|
||||
if not isinstance(data, Mapping):
|
||||
raise UgreenLoginError("登录成功但响应 data 为空")
|
||||
|
||||
token = str(data.get("token") or "").strip()
|
||||
public_key = str(data.get("public_key") or "").strip()
|
||||
if not token:
|
||||
raise UgreenLoginError("登录成功但未拿到 token")
|
||||
if not public_key:
|
||||
raise UgreenLoginError("登录成功但未拿到 public_key")
|
||||
return token, _decode_public_key(public_key)
|
||||
|
||||
|
||||
def _fetch_media_lib(
|
||||
session: requests.Session,
|
||||
base_url: str,
|
||||
token: str,
|
||||
public_key: str,
|
||||
client_id: str,
|
||||
client_version: str,
|
||||
language: str,
|
||||
page: int,
|
||||
page_size: int,
|
||||
timeout: float,
|
||||
verify_ssl: bool,
|
||||
) -> Any:
|
||||
crypto = UgreenCrypto(
|
||||
public_key=public_key,
|
||||
token=token,
|
||||
client_id=client_id,
|
||||
client_version=client_version,
|
||||
ug_agent="PC/WEB",
|
||||
language=language,
|
||||
)
|
||||
req = crypto.build_encrypted_request(
|
||||
url=f"{base_url}/ugreen/v1/video/homepage/media_list",
|
||||
method="GET",
|
||||
params={"page": page, "page_size": page_size},
|
||||
)
|
||||
media_resp = session.get(
|
||||
req.url,
|
||||
headers=req.headers,
|
||||
params=req.params,
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
media_json = _json_or_raise(media_resp, "获取媒体库")
|
||||
return crypto.decrypt_response(media_json, req.aes_key)
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="登录绿联 NAS 并调用媒体库接口(自动处理请求加密/响应解密)"
|
||||
)
|
||||
parser.add_argument("--host", help="服务器地址,例如: http://192.168.20.101:9999")
|
||||
parser.add_argument("--username", help="用户名")
|
||||
parser.add_argument("--password", help="密码(不传则交互输入)")
|
||||
parser.add_argument("--client-id", help="可选,默认自动生成 UUID-WEB")
|
||||
parser.add_argument("--client-version", default="76363", help="默认: 76363")
|
||||
parser.add_argument("--language", default="zh-CN", help="默认: zh-CN")
|
||||
parser.add_argument("--page", type=int, default=1, help="默认: 1")
|
||||
parser.add_argument("--page-size", type=int, default=50, help="默认: 50")
|
||||
parser.add_argument("--timeout", type=float, default=20.0, help="默认: 20 秒")
|
||||
parser.add_argument("--insecure", action="store_true", help="忽略 HTTPS 证书校验")
|
||||
parser.add_argument(
|
||||
"--no-keepalive",
|
||||
action="store_true",
|
||||
help="关闭保持登录(默认保持登录)",
|
||||
)
|
||||
parser.add_argument("--pretty", action="store_true", help="美化输出 JSON")
|
||||
parser.add_argument("--output", help="将解密后的结果写入文件")
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv or sys.argv[1:])
|
||||
|
||||
host = args.host or input("服务器地址: ").strip()
|
||||
username = args.username or input("用户名: ").strip()
|
||||
password = args.password or getpass.getpass("密码: ")
|
||||
client_id = (args.client_id or f"{uuid.uuid4()}-WEB").strip()
|
||||
keepalive = not args.no_keepalive
|
||||
verify_ssl = not args.insecure
|
||||
|
||||
try:
|
||||
base_url = _normalize_base_url(host)
|
||||
if args.insecure:
|
||||
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
|
||||
|
||||
session = requests.Session()
|
||||
headers = _build_common_headers(
|
||||
client_id=client_id,
|
||||
client_version=args.client_version,
|
||||
language=args.language,
|
||||
)
|
||||
|
||||
token, public_key = _login_and_get_access(
|
||||
session=session,
|
||||
base_url=base_url,
|
||||
username=username,
|
||||
password=password,
|
||||
keepalive=keepalive,
|
||||
headers=headers,
|
||||
timeout=args.timeout,
|
||||
verify_ssl=verify_ssl,
|
||||
)
|
||||
decoded = _fetch_media_lib(
|
||||
session=session,
|
||||
base_url=base_url,
|
||||
token=token,
|
||||
public_key=public_key,
|
||||
client_id=client_id,
|
||||
client_version=args.client_version,
|
||||
language=args.language,
|
||||
page=args.page,
|
||||
page_size=args.page_size,
|
||||
timeout=args.timeout,
|
||||
verify_ssl=verify_ssl,
|
||||
)
|
||||
|
||||
if isinstance(decoded, Mapping):
|
||||
if decoded.get("code") != 200:
|
||||
raise UgreenLoginError(
|
||||
f"媒体库接口失败: code={decoded.get('code')} msg={decoded.get('msg')}"
|
||||
)
|
||||
media_count = None
|
||||
data = decoded.get("data")
|
||||
if isinstance(data, Mapping) and isinstance(data.get("media_lib_info_list"), list):
|
||||
media_count = len(data["media_lib_info_list"])
|
||||
print(
|
||||
f"调用成功: code={decoded.get('code')} msg={decoded.get('msg')} "
|
||||
f"media_lib_info_list={media_count}"
|
||||
)
|
||||
|
||||
text = json.dumps(
|
||||
decoded,
|
||||
ensure_ascii=False,
|
||||
indent=2 if args.pretty else None,
|
||||
separators=(",", ":") if not args.pretty else None,
|
||||
)
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
f.write("\n")
|
||||
print(f"解密结果已写入: {args.output}")
|
||||
else:
|
||||
print(text)
|
||||
return 0
|
||||
except UgreenLoginError as exc:
|
||||
print(f"错误: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
except requests.RequestException as exc:
|
||||
print(f"网络错误: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
113
tests/test_ugreen_api.py
Normal file
113
tests/test_ugreen_api.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.modules.ugreen.api import Api
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, payload: dict, headers: dict | None = None):
|
||||
self._payload = payload
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
class _FakeSession:
|
||||
def __init__(self, get_responses=None, post_responses=None):
|
||||
self._get_responses = list(get_responses or [])
|
||||
self._post_responses = list(post_responses or [])
|
||||
self.calls: list[tuple[str, dict]] = []
|
||||
self.cookies = SimpleNamespace(
|
||||
get_dict=lambda: {},
|
||||
update=lambda *_args, **_kwargs: None,
|
||||
)
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
if args:
|
||||
kwargs = {"url": args[0], **kwargs}
|
||||
self.calls.append(("GET", kwargs))
|
||||
return self._get_responses.pop(0) if self._get_responses else _FakeResponse({})
|
||||
|
||||
def post(self, *args, **kwargs):
|
||||
if args:
|
||||
kwargs = {"url": args[0], **kwargs}
|
||||
self.calls.append(("POST", kwargs))
|
||||
return self._post_responses.pop(0) if self._post_responses else _FakeResponse({})
|
||||
|
||||
@staticmethod
|
||||
def close():
|
||||
return None
|
||||
|
||||
|
||||
class _FakeCrypto:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def rsa_encrypt_long(raw: str) -> str:
|
||||
return f"enc:{raw}"
|
||||
|
||||
@staticmethod
|
||||
def build_encrypted_request(url: str, method: str = "GET", params=None, **kwargs):
|
||||
return SimpleNamespace(url=url, headers={}, params=params or {}, json=None, aes_key="k")
|
||||
|
||||
@staticmethod
|
||||
def decrypt_response(payload, aes_key):
|
||||
return payload
|
||||
|
||||
|
||||
class UgreenApiVerifySslTest(unittest.TestCase):
|
||||
def test_request_json_default_verify_ssl_true(self):
|
||||
api = Api(host="https://example.com")
|
||||
fake_session = _FakeSession(
|
||||
get_responses=[_FakeResponse({"code": 200})],
|
||||
post_responses=[_FakeResponse({"code": 200})],
|
||||
)
|
||||
api._session = fake_session
|
||||
|
||||
api._request_json(url="https://example.com/a", method="GET")
|
||||
api._request_json(url="https://example.com/b", method="POST", json_data={"x": 1})
|
||||
|
||||
self.assertEqual(fake_session.calls[0][1].get("verify"), True)
|
||||
self.assertEqual(fake_session.calls[1][1].get("verify"), True)
|
||||
|
||||
def test_login_logout_follow_verify_ssl_flag(self):
|
||||
api = Api(host="https://example.com", verify_ssl=False)
|
||||
fake_session = _FakeSession(
|
||||
get_responses=[_FakeResponse({})],
|
||||
post_responses=[
|
||||
_FakeResponse({"code": 200, "msg": "ok", "data": {}}, headers={"x-rsa-token": "BEGIN TEST"}),
|
||||
_FakeResponse(
|
||||
{
|
||||
"code": 200,
|
||||
"msg": "ok",
|
||||
"data": {
|
||||
"token": "token-value",
|
||||
"public_key": "BEGIN LOGIN KEY",
|
||||
"static_token": "static-token",
|
||||
"is_ugk": False,
|
||||
},
|
||||
}
|
||||
),
|
||||
],
|
||||
)
|
||||
api._session = fake_session
|
||||
|
||||
with patch("app.modules.ugreen.api.UgreenCrypto", _FakeCrypto):
|
||||
token = api.login("tester", "pwd")
|
||||
self.assertEqual(token, "token-value")
|
||||
api.logout()
|
||||
|
||||
self.assertEqual(len(fake_session.calls), 3)
|
||||
self.assertEqual(fake_session.calls[0][0], "POST")
|
||||
self.assertEqual(fake_session.calls[1][0], "POST")
|
||||
self.assertEqual(fake_session.calls[2][0], "GET")
|
||||
self.assertEqual(fake_session.calls[0][1].get("verify"), False)
|
||||
self.assertEqual(fake_session.calls[1][1].get("verify"), False)
|
||||
self.assertEqual(fake_session.calls[2][1].get("verify"), False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
95
tests/test_ugreen_crypto.py
Normal file
95
tests/test_ugreen_crypto.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||
|
||||
from app.utils.ugreen_crypto import UgreenCrypto
|
||||
|
||||
|
||||
def _generate_rsa_keys() -> tuple[str, rsa.RSAPrivateKey]:
|
||||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
public_pem = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.PKCS1,
|
||||
).decode("utf-8")
|
||||
return public_pem, private_key
|
||||
|
||||
|
||||
def _rsa_decrypt_long(private_key: rsa.RSAPrivateKey, payload_b64: str) -> str:
|
||||
encrypted = base64.b64decode(payload_b64)
|
||||
chunk_size = private_key.key_size // 8
|
||||
plain_chunks = []
|
||||
for start in range(0, len(encrypted), chunk_size):
|
||||
chunk = encrypted[start : start + chunk_size]
|
||||
plain_chunks.append(private_key.decrypt(chunk, padding.PKCS1v15()))
|
||||
return b"".join(plain_chunks).decode("utf-8")
|
||||
|
||||
|
||||
class UgreenCryptoTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.public_key, self.private_key = _generate_rsa_keys()
|
||||
self.token = "demo-token-for-test"
|
||||
self.crypto = UgreenCrypto(
|
||||
public_key=self.public_key,
|
||||
token=self.token,
|
||||
client_id="test-client-id",
|
||||
)
|
||||
|
||||
def test_rsa_encrypt_long(self):
|
||||
plain = "A" * 400
|
||||
encrypted = self.crypto.rsa_encrypt_long(plain)
|
||||
self.assertEqual(plain, _rsa_decrypt_long(self.private_key, encrypted))
|
||||
|
||||
def test_build_encrypted_request_and_decrypt_response(self):
|
||||
req = self.crypto.build_encrypted_request(
|
||||
url="http://127.0.0.1:9999/ugreen/v1/video/homepage/media_list",
|
||||
params={"page": 1, "page_size": 50},
|
||||
data={"foo": "bar", "count": 2},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
req.plain_query,
|
||||
"page=1&page_size=50",
|
||||
)
|
||||
self.assertEqual(
|
||||
req.plain_query,
|
||||
self.crypto.aes_gcm_decrypt(req.params["encrypt_query"], req.aes_key),
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
req.headers["X-Ugreen-Security-Key"],
|
||||
hashlib.md5(self.token.encode("utf-8")).hexdigest(),
|
||||
)
|
||||
self.assertEqual(
|
||||
req.aes_key,
|
||||
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Security-Code"]),
|
||||
)
|
||||
self.assertEqual(
|
||||
self.token,
|
||||
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Token"]),
|
||||
)
|
||||
|
||||
encrypted_body = req.json["encrypt_req_body"]
|
||||
body_plain = self.crypto.aes_gcm_decrypt(encrypted_body, req.aes_key)
|
||||
self.assertEqual(json.loads(body_plain), {"foo": "bar", "count": 2})
|
||||
self.assertEqual(
|
||||
req.json["req_body_sha256"],
|
||||
hashlib.sha256(body_plain.encode("utf-8")).hexdigest(),
|
||||
)
|
||||
|
||||
server_payload = {"code": 0, "msg": "ok", "data": {"items": [1, 2, 3]}}
|
||||
resp = {
|
||||
"encrypt_resp_body": self.crypto.aes_gcm_encrypt(
|
||||
json.dumps(server_payload, ensure_ascii=False, separators=(",", ":")),
|
||||
req.aes_key,
|
||||
)
|
||||
}
|
||||
decoded = self.crypto.decrypt_response(resp, req.aes_key)
|
||||
self.assertEqual(decoded, server_payload)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
188
tests/test_ugreen_mediaserver.py
Normal file
188
tests/test_ugreen_mediaserver.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
from app import schemas
|
||||
|
||||
try:
|
||||
from app.api.endpoints import dashboard as dashboard_endpoint
|
||||
except Exception:
|
||||
dashboard_endpoint = None
|
||||
|
||||
|
||||
def _load_ugreen_class():
|
||||
"""
|
||||
在测试中动态加载 Ugreen,避免受可选依赖(如 pyquery/sqlalchemy)影响。
|
||||
"""
|
||||
module_name = "_test_ugreen_module"
|
||||
if module_name in sys.modules:
|
||||
return sys.modules[module_name].Ugreen
|
||||
|
||||
# 轻量日志桩
|
||||
if "app.log" not in sys.modules:
|
||||
log_module = types.ModuleType("app.log")
|
||||
|
||||
class _Logger:
|
||||
def info(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def warning(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def error(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def debug(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
log_module.logger = _Logger()
|
||||
sys.modules["app.log"] = log_module
|
||||
|
||||
# SystemConfigOper 桩
|
||||
if "app.db.systemconfig_oper" not in sys.modules:
|
||||
db_module = types.ModuleType("app.db.systemconfig_oper")
|
||||
|
||||
class _SystemConfigOper:
|
||||
@staticmethod
|
||||
def get(_key):
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def set(_key, _value):
|
||||
return None
|
||||
|
||||
db_module.SystemConfigOper = _SystemConfigOper
|
||||
sys.modules["app.db.systemconfig_oper"] = db_module
|
||||
|
||||
# app.modules / app.modules.ugreen / app.modules.ugreen.api 桩
|
||||
if "app.modules" not in sys.modules:
|
||||
pkg = types.ModuleType("app.modules")
|
||||
pkg.__path__ = []
|
||||
sys.modules["app.modules"] = pkg
|
||||
if "app.modules.ugreen" not in sys.modules:
|
||||
subpkg = types.ModuleType("app.modules.ugreen")
|
||||
subpkg.__path__ = []
|
||||
sys.modules["app.modules.ugreen"] = subpkg
|
||||
if "app.modules.ugreen.api" not in sys.modules:
|
||||
api_module = types.ModuleType("app.modules.ugreen.api")
|
||||
|
||||
class _Api:
|
||||
host = ""
|
||||
token = None
|
||||
|
||||
api_module.Api = _Api
|
||||
sys.modules["app.modules.ugreen.api"] = api_module
|
||||
|
||||
ugreen_path = Path(__file__).resolve().parents[1] / "app" / "modules" / "ugreen" / "ugreen.py"
|
||||
spec = importlib.util.spec_from_file_location(module_name, ugreen_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
assert spec and spec.loader
|
||||
spec.loader.exec_module(module)
|
||||
return module.Ugreen
|
||||
|
||||
|
||||
Ugreen = _load_ugreen_class()
|
||||
|
||||
|
||||
class _FakeUgreenApi:
|
||||
host = "http://127.0.0.1:9999"
|
||||
token = "test-token"
|
||||
|
||||
@staticmethod
|
||||
def video_all(classification: int, page: int = 1, page_size: int = 1):
|
||||
if classification == -102:
|
||||
return {"total_num": 12}
|
||||
if classification == -103:
|
||||
return {"total_num": 34}
|
||||
return {"total_num": 0}
|
||||
|
||||
|
||||
class UgreenScanModeTest(unittest.TestCase):
|
||||
def test_resolve_scan_type(self):
|
||||
resolve = Ugreen._Ugreen__resolve_scan_type
|
||||
|
||||
self.assertEqual(resolve(scan_mode="new_and_modified"), 1)
|
||||
self.assertEqual(resolve(scan_mode="supplement_missing"), 2)
|
||||
self.assertEqual(resolve(scan_mode="full_override"), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_mode="1"), 1)
|
||||
self.assertEqual(resolve(scan_mode="2"), 2)
|
||||
self.assertEqual(resolve(scan_mode="3"), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_type=1), 1)
|
||||
self.assertEqual(resolve(scan_type=2), 2)
|
||||
self.assertEqual(resolve(scan_type=3), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_mode="unknown"), 2)
|
||||
self.assertEqual(resolve(), 2)
|
||||
|
||||
|
||||
class UgreenVerifySslTest(unittest.TestCase):
|
||||
def test_resolve_verify_ssl(self):
|
||||
resolve = Ugreen._Ugreen__resolve_verify_ssl
|
||||
self.assertEqual(resolve(True), True)
|
||||
self.assertEqual(resolve(False), False)
|
||||
self.assertEqual(resolve("true"), True)
|
||||
self.assertEqual(resolve("1"), True)
|
||||
self.assertEqual(resolve("false"), False)
|
||||
self.assertEqual(resolve("0"), False)
|
||||
self.assertEqual(resolve(None), True)
|
||||
|
||||
|
||||
class UgreenStatisticTest(unittest.TestCase):
|
||||
def test_get_medias_count_episode_is_none(self):
|
||||
ugreen = Ugreen.__new__(Ugreen)
|
||||
ugreen._host = "http://127.0.0.1:9999"
|
||||
ugreen._username = "tester"
|
||||
ugreen._password = "secret"
|
||||
ugreen._userinfo = {"name": "tester"}
|
||||
ugreen._api = _FakeUgreenApi()
|
||||
|
||||
stat = ugreen.get_medias_count()
|
||||
self.assertEqual(stat.movie_count, 12)
|
||||
self.assertEqual(stat.tv_count, 34)
|
||||
self.assertIsNone(stat.episode_count)
|
||||
|
||||
|
||||
class DashboardStatisticTest(unittest.TestCase):
|
||||
@unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing")
|
||||
def test_statistic_all_episode_missing(self):
|
||||
mocked_stats = [
|
||||
schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2),
|
||||
schemas.Statistic(movie_count=1, tv_count=2, episode_count=None, user_count=1),
|
||||
]
|
||||
with patch(
|
||||
"app.api.endpoints.dashboard.DashboardChain.media_statistic",
|
||||
return_value=mocked_stats,
|
||||
):
|
||||
ret = dashboard_endpoint.statistic(name="ugreen", _=None)
|
||||
|
||||
self.assertEqual(ret.movie_count, 11)
|
||||
self.assertEqual(ret.tv_count, 22)
|
||||
self.assertEqual(ret.user_count, 3)
|
||||
self.assertIsNone(ret.episode_count)
|
||||
|
||||
@unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing")
|
||||
def test_statistic_mixed_episode_count(self):
|
||||
mocked_stats = [
|
||||
schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2),
|
||||
schemas.Statistic(movie_count=1, tv_count=2, episode_count=6, user_count=1),
|
||||
]
|
||||
with patch(
|
||||
"app.api.endpoints.dashboard.DashboardChain.media_statistic",
|
||||
return_value=mocked_stats,
|
||||
):
|
||||
ret = dashboard_endpoint.statistic(name="all", _=None)
|
||||
|
||||
self.assertEqual(ret.movie_count, 11)
|
||||
self.assertEqual(ret.tv_count, 22)
|
||||
self.assertEqual(ret.user_count, 3)
|
||||
self.assertEqual(ret.episode_count, 6)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user