Files
MoviePilot/tests/manual/ugreen_media_cli.py

300 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import base64
import getpass
import json
import os
import sys
import uuid
from typing import Any, Mapping
from urllib.parse import urlsplit, urlunsplit
# 兼容直接运行脚本:避免 app/utils 被放在 sys.path 首位导致标准库模块被同名文件遮蔽
if __name__ == "__main__" and __package__ is None:
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
if script_dir in sys.path:
sys.path.remove(script_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
import requests
from app.utils.ugreen_crypto import UgreenCrypto
class UgreenLoginError(Exception):
pass
def _normalize_base_url(raw: str) -> str:
value = (raw or "").strip()
if not value:
raise UgreenLoginError("服务器地址不能为空")
if not value.startswith(("http://", "https://")):
value = f"http://{value}"
parsed = urlsplit(value)
if not parsed.netloc:
raise UgreenLoginError(f"无效服务器地址: {raw}")
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
def _json_or_raise(resp: requests.Response, stage: str) -> dict[str, Any]:
try:
data = resp.json()
except Exception as exc: # pragma: no cover - 网络异常路径
raise UgreenLoginError(
f"{stage} 返回非 JSONHTTP {resp.status_code},响应片段: {resp.text[:200]}"
) from exc
if not isinstance(data, dict):
raise UgreenLoginError(f"{stage} 返回格式异常: {type(data).__name__}")
return data
def _decode_public_key(raw: str) -> str:
value = (raw or "").strip()
if not value:
raise UgreenLoginError("未获取到公钥")
if "BEGIN" in value:
return value
try:
return base64.b64decode(value).decode("utf-8")
except Exception as exc:
raise UgreenLoginError("公钥解码失败") from exc
def _raise_if_failed(payload: Mapping[str, Any], stage: str) -> None:
if payload.get("code") == 200:
return
raise UgreenLoginError(
f"{stage}失败: code={payload.get('code')} msg={payload.get('msg')}"
)
def _build_common_headers(
client_id: str, client_version: str, language: str
) -> dict[str, str]:
return {
"Accept": "application/json, text/plain, */*",
"Client-Id": client_id,
"Client-Version": client_version,
"UG-Agent": "PC/WEB",
"X-Specify-Language": language,
}
def _login_and_get_access(
session: requests.Session,
base_url: str,
username: str,
password: str,
keepalive: bool,
headers: Mapping[str, str],
timeout: float,
verify_ssl: bool,
) -> tuple[str, str]:
check_resp = session.post(
f"{base_url}/ugreen/v1/verify/check",
json={"username": username},
headers=dict(headers),
timeout=timeout,
verify=verify_ssl,
)
check_json = _json_or_raise(check_resp, "获取登录公钥")
_raise_if_failed(check_json, "获取登录公钥")
rsa_token = (
check_resp.headers.get("x-rsa-token")
or check_resp.headers.get("X-Rsa-Token")
or check_json.get("xRsaToken")
or check_json.get("x-rsa-token")
)
if not rsa_token:
data = check_json.get("data")
if isinstance(data, Mapping):
rsa_token = data.get("xRsaToken") or data.get("x-rsa-token")
if not rsa_token:
raise UgreenLoginError("登录公钥为空x-rsa-token")
login_public_key = _decode_public_key(str(rsa_token))
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(
password
)
login_payload = {
"username": username,
"password": encrypted_password,
"keepalive": keepalive,
"otp": True,
"is_simple": True,
}
login_resp = session.post(
f"{base_url}/ugreen/v1/verify/login",
json=login_payload,
headers=dict(headers),
timeout=timeout,
verify=verify_ssl,
)
login_json = _json_or_raise(login_resp, "登录")
_raise_if_failed(login_json, "登录")
data = login_json.get("data")
if not isinstance(data, Mapping):
raise UgreenLoginError("登录成功但响应 data 为空")
token = str(data.get("token") or "").strip()
public_key = str(data.get("public_key") or "").strip()
if not token:
raise UgreenLoginError("登录成功但未拿到 token")
if not public_key:
raise UgreenLoginError("登录成功但未拿到 public_key")
return token, _decode_public_key(public_key)
def _fetch_media_lib(
session: requests.Session,
base_url: str,
token: str,
public_key: str,
client_id: str,
client_version: str,
language: str,
page: int,
page_size: int,
timeout: float,
verify_ssl: bool,
) -> Any:
crypto = UgreenCrypto(
public_key=public_key,
token=token,
client_id=client_id,
client_version=client_version,
ug_agent="PC/WEB",
language=language,
)
req = crypto.build_encrypted_request(
url=f"{base_url}/ugreen/v1/video/homepage/media_list",
method="GET",
params={"page": page, "page_size": page_size},
)
media_resp = session.get(
req.url,
headers=req.headers,
params=req.params,
timeout=timeout,
verify=verify_ssl,
)
media_json = _json_or_raise(media_resp, "获取媒体库")
return crypto.decrypt_response(media_json, req.aes_key)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="登录绿联 NAS 并调用媒体库接口(自动处理请求加密/响应解密)"
)
parser.add_argument("--host", help="服务器地址,例如: http://192.168.20.101:9999")
parser.add_argument("--username", help="用户名")
parser.add_argument("--password", help="密码(不传则交互输入)")
parser.add_argument("--client-id", help="可选,默认自动生成 UUID-WEB")
parser.add_argument("--client-version", default="76363", help="默认: 76363")
parser.add_argument("--language", default="zh-CN", help="默认: zh-CN")
parser.add_argument("--page", type=int, default=1, help="默认: 1")
parser.add_argument("--page-size", type=int, default=50, help="默认: 50")
parser.add_argument("--timeout", type=float, default=20.0, help="默认: 20 秒")
parser.add_argument("--insecure", action="store_true", help="忽略 HTTPS 证书校验")
parser.add_argument(
"--no-keepalive",
action="store_true",
help="关闭保持登录(默认保持登录)",
)
parser.add_argument("--pretty", action="store_true", help="美化输出 JSON")
parser.add_argument("--output", help="将解密后的结果写入文件")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
host = args.host or input("服务器地址: ").strip()
username = args.username or input("用户名: ").strip()
password = args.password or getpass.getpass("密码: ")
client_id = (args.client_id or f"{uuid.uuid4()}-WEB").strip()
keepalive = not args.no_keepalive
verify_ssl = not args.insecure
try:
base_url = _normalize_base_url(host)
if args.insecure:
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
session = requests.Session()
headers = _build_common_headers(
client_id=client_id,
client_version=args.client_version,
language=args.language,
)
token, public_key = _login_and_get_access(
session=session,
base_url=base_url,
username=username,
password=password,
keepalive=keepalive,
headers=headers,
timeout=args.timeout,
verify_ssl=verify_ssl,
)
decoded = _fetch_media_lib(
session=session,
base_url=base_url,
token=token,
public_key=public_key,
client_id=client_id,
client_version=args.client_version,
language=args.language,
page=args.page,
page_size=args.page_size,
timeout=args.timeout,
verify_ssl=verify_ssl,
)
if isinstance(decoded, Mapping):
if decoded.get("code") != 200:
raise UgreenLoginError(
f"媒体库接口失败: code={decoded.get('code')} msg={decoded.get('msg')}"
)
media_count = None
data = decoded.get("data")
if isinstance(data, Mapping) and isinstance(data.get("media_lib_info_list"), list):
media_count = len(data["media_lib_info_list"])
print(
f"调用成功: code={decoded.get('code')} msg={decoded.get('msg')} "
f"media_lib_info_list={media_count}"
)
text = json.dumps(
decoded,
ensure_ascii=False,
indent=2 if args.pretty else None,
separators=(",", ":") if not args.pretty else None,
)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(text)
f.write("\n")
print(f"解密结果已写入: {args.output}")
else:
print(text)
return 0
except UgreenLoginError as exc:
print(f"错误: {exc}", file=sys.stderr)
return 1
except requests.RequestException as exc:
print(f"网络错误: {exc}", file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())