mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
300 lines
9.7 KiB
Python
300 lines
9.7 KiB
Python
from __future__ import annotations
|
||
|
||
import argparse
|
||
import base64
|
||
import getpass
|
||
import json
|
||
import os
|
||
import sys
|
||
import uuid
|
||
from typing import Any, Mapping
|
||
from urllib.parse import urlsplit, urlunsplit
|
||
|
||
# 兼容直接运行脚本:避免 app/utils 被放在 sys.path 首位导致标准库模块被同名文件遮蔽
|
||
if __name__ == "__main__" and __package__ is None:
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
project_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
|
||
if script_dir in sys.path:
|
||
sys.path.remove(script_dir)
|
||
if project_root not in sys.path:
|
||
sys.path.insert(0, project_root)
|
||
|
||
import requests
|
||
|
||
from app.utils.ugreen_crypto import UgreenCrypto
|
||
|
||
|
||
class UgreenLoginError(Exception):
|
||
pass
|
||
|
||
|
||
def _normalize_base_url(raw: str) -> str:
|
||
value = (raw or "").strip()
|
||
if not value:
|
||
raise UgreenLoginError("服务器地址不能为空")
|
||
if not value.startswith(("http://", "https://")):
|
||
value = f"http://{value}"
|
||
parsed = urlsplit(value)
|
||
if not parsed.netloc:
|
||
raise UgreenLoginError(f"无效服务器地址: {raw}")
|
||
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
|
||
|
||
|
||
def _json_or_raise(resp: requests.Response, stage: str) -> dict[str, Any]:
|
||
try:
|
||
data = resp.json()
|
||
except Exception as exc: # pragma: no cover - 网络异常路径
|
||
raise UgreenLoginError(
|
||
f"{stage} 返回非 JSON,HTTP {resp.status_code},响应片段: {resp.text[:200]}"
|
||
) from exc
|
||
if not isinstance(data, dict):
|
||
raise UgreenLoginError(f"{stage} 返回格式异常: {type(data).__name__}")
|
||
return data
|
||
|
||
|
||
def _decode_public_key(raw: str) -> str:
|
||
value = (raw or "").strip()
|
||
if not value:
|
||
raise UgreenLoginError("未获取到公钥")
|
||
if "BEGIN" in value:
|
||
return value
|
||
try:
|
||
return base64.b64decode(value).decode("utf-8")
|
||
except Exception as exc:
|
||
raise UgreenLoginError("公钥解码失败") from exc
|
||
|
||
|
||
def _raise_if_failed(payload: Mapping[str, Any], stage: str) -> None:
|
||
if payload.get("code") == 200:
|
||
return
|
||
raise UgreenLoginError(
|
||
f"{stage}失败: code={payload.get('code')} msg={payload.get('msg')}"
|
||
)
|
||
|
||
|
||
def _build_common_headers(
|
||
client_id: str, client_version: str, language: str
|
||
) -> dict[str, str]:
|
||
return {
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Client-Id": client_id,
|
||
"Client-Version": client_version,
|
||
"UG-Agent": "PC/WEB",
|
||
"X-Specify-Language": language,
|
||
}
|
||
|
||
|
||
def _login_and_get_access(
|
||
session: requests.Session,
|
||
base_url: str,
|
||
username: str,
|
||
password: str,
|
||
keepalive: bool,
|
||
headers: Mapping[str, str],
|
||
timeout: float,
|
||
verify_ssl: bool,
|
||
) -> tuple[str, str]:
|
||
check_resp = session.post(
|
||
f"{base_url}/ugreen/v1/verify/check",
|
||
json={"username": username},
|
||
headers=dict(headers),
|
||
timeout=timeout,
|
||
verify=verify_ssl,
|
||
)
|
||
check_json = _json_or_raise(check_resp, "获取登录公钥")
|
||
_raise_if_failed(check_json, "获取登录公钥")
|
||
|
||
rsa_token = (
|
||
check_resp.headers.get("x-rsa-token")
|
||
or check_resp.headers.get("X-Rsa-Token")
|
||
or check_json.get("xRsaToken")
|
||
or check_json.get("x-rsa-token")
|
||
)
|
||
if not rsa_token:
|
||
data = check_json.get("data")
|
||
if isinstance(data, Mapping):
|
||
rsa_token = data.get("xRsaToken") or data.get("x-rsa-token")
|
||
if not rsa_token:
|
||
raise UgreenLoginError("登录公钥为空(x-rsa-token)")
|
||
|
||
login_public_key = _decode_public_key(str(rsa_token))
|
||
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(
|
||
password
|
||
)
|
||
|
||
login_payload = {
|
||
"username": username,
|
||
"password": encrypted_password,
|
||
"keepalive": keepalive,
|
||
"otp": True,
|
||
"is_simple": True,
|
||
}
|
||
login_resp = session.post(
|
||
f"{base_url}/ugreen/v1/verify/login",
|
||
json=login_payload,
|
||
headers=dict(headers),
|
||
timeout=timeout,
|
||
verify=verify_ssl,
|
||
)
|
||
login_json = _json_or_raise(login_resp, "登录")
|
||
_raise_if_failed(login_json, "登录")
|
||
|
||
data = login_json.get("data")
|
||
if not isinstance(data, Mapping):
|
||
raise UgreenLoginError("登录成功但响应 data 为空")
|
||
|
||
token = str(data.get("token") or "").strip()
|
||
public_key = str(data.get("public_key") or "").strip()
|
||
if not token:
|
||
raise UgreenLoginError("登录成功但未拿到 token")
|
||
if not public_key:
|
||
raise UgreenLoginError("登录成功但未拿到 public_key")
|
||
return token, _decode_public_key(public_key)
|
||
|
||
|
||
def _fetch_media_lib(
|
||
session: requests.Session,
|
||
base_url: str,
|
||
token: str,
|
||
public_key: str,
|
||
client_id: str,
|
||
client_version: str,
|
||
language: str,
|
||
page: int,
|
||
page_size: int,
|
||
timeout: float,
|
||
verify_ssl: bool,
|
||
) -> Any:
|
||
crypto = UgreenCrypto(
|
||
public_key=public_key,
|
||
token=token,
|
||
client_id=client_id,
|
||
client_version=client_version,
|
||
ug_agent="PC/WEB",
|
||
language=language,
|
||
)
|
||
req = crypto.build_encrypted_request(
|
||
url=f"{base_url}/ugreen/v1/video/homepage/media_list",
|
||
method="GET",
|
||
params={"page": page, "page_size": page_size},
|
||
)
|
||
media_resp = session.get(
|
||
req.url,
|
||
headers=req.headers,
|
||
params=req.params,
|
||
timeout=timeout,
|
||
verify=verify_ssl,
|
||
)
|
||
media_json = _json_or_raise(media_resp, "获取媒体库")
|
||
return crypto.decrypt_response(media_json, req.aes_key)
|
||
|
||
|
||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(
|
||
description="登录绿联 NAS 并调用媒体库接口(自动处理请求加密/响应解密)"
|
||
)
|
||
parser.add_argument("--host", help="服务器地址,例如: http://192.168.20.101:9999")
|
||
parser.add_argument("--username", help="用户名")
|
||
parser.add_argument("--password", help="密码(不传则交互输入)")
|
||
parser.add_argument("--client-id", help="可选,默认自动生成 UUID-WEB")
|
||
parser.add_argument("--client-version", default="76363", help="默认: 76363")
|
||
parser.add_argument("--language", default="zh-CN", help="默认: zh-CN")
|
||
parser.add_argument("--page", type=int, default=1, help="默认: 1")
|
||
parser.add_argument("--page-size", type=int, default=50, help="默认: 50")
|
||
parser.add_argument("--timeout", type=float, default=20.0, help="默认: 20 秒")
|
||
parser.add_argument("--insecure", action="store_true", help="忽略 HTTPS 证书校验")
|
||
parser.add_argument(
|
||
"--no-keepalive",
|
||
action="store_true",
|
||
help="关闭保持登录(默认保持登录)",
|
||
)
|
||
parser.add_argument("--pretty", action="store_true", help="美化输出 JSON")
|
||
parser.add_argument("--output", help="将解密后的结果写入文件")
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
args = parse_args(argv or sys.argv[1:])
|
||
|
||
host = args.host or input("服务器地址: ").strip()
|
||
username = args.username or input("用户名: ").strip()
|
||
password = args.password or getpass.getpass("密码: ")
|
||
client_id = (args.client_id or f"{uuid.uuid4()}-WEB").strip()
|
||
keepalive = not args.no_keepalive
|
||
verify_ssl = not args.insecure
|
||
|
||
try:
|
||
base_url = _normalize_base_url(host)
|
||
if args.insecure:
|
||
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
|
||
|
||
session = requests.Session()
|
||
headers = _build_common_headers(
|
||
client_id=client_id,
|
||
client_version=args.client_version,
|
||
language=args.language,
|
||
)
|
||
|
||
token, public_key = _login_and_get_access(
|
||
session=session,
|
||
base_url=base_url,
|
||
username=username,
|
||
password=password,
|
||
keepalive=keepalive,
|
||
headers=headers,
|
||
timeout=args.timeout,
|
||
verify_ssl=verify_ssl,
|
||
)
|
||
decoded = _fetch_media_lib(
|
||
session=session,
|
||
base_url=base_url,
|
||
token=token,
|
||
public_key=public_key,
|
||
client_id=client_id,
|
||
client_version=args.client_version,
|
||
language=args.language,
|
||
page=args.page,
|
||
page_size=args.page_size,
|
||
timeout=args.timeout,
|
||
verify_ssl=verify_ssl,
|
||
)
|
||
|
||
if isinstance(decoded, Mapping):
|
||
if decoded.get("code") != 200:
|
||
raise UgreenLoginError(
|
||
f"媒体库接口失败: code={decoded.get('code')} msg={decoded.get('msg')}"
|
||
)
|
||
media_count = None
|
||
data = decoded.get("data")
|
||
if isinstance(data, Mapping) and isinstance(data.get("media_lib_info_list"), list):
|
||
media_count = len(data["media_lib_info_list"])
|
||
print(
|
||
f"调用成功: code={decoded.get('code')} msg={decoded.get('msg')} "
|
||
f"media_lib_info_list={media_count}"
|
||
)
|
||
|
||
text = json.dumps(
|
||
decoded,
|
||
ensure_ascii=False,
|
||
indent=2 if args.pretty else None,
|
||
separators=(",", ":") if not args.pretty else None,
|
||
)
|
||
if args.output:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
f.write(text)
|
||
f.write("\n")
|
||
print(f"解密结果已写入: {args.output}")
|
||
else:
|
||
print(text)
|
||
return 0
|
||
except UgreenLoginError as exc:
|
||
print(f"错误: {exc}", file=sys.stderr)
|
||
return 1
|
||
except requests.RequestException as exc:
|
||
print(f"网络错误: {exc}", file=sys.stderr)
|
||
return 2
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|