Files
MoviePilot/app/modules/ugreen/api.py

751 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Mapping, Optional, Union
from urllib.parse import urlsplit, urlunsplit
from requests import Session
from app.log import logger
from app.utils.ugreen_crypto import UgreenCrypto
from app.utils.url import UrlUtils
@dataclass
class ApiResult:
code: int = -1
msg: str = ""
data: Any = None
debug: Optional[str] = None
raw: Optional[dict] = None
@property
def success(self) -> bool:
return self.code == 200
class Api:
"""
绿联影视 API 客户端(统一加密通道)。
说明:
1. 所有业务接口调用都应走 `request()`
2. `request()` 会自动将明文查询参数加密为 `encrypt_query`
3. 若响应包含 `encrypt_resp_body`,会自动完成解密后再返回。
"""
__slots__ = (
"_host",
"_session",
"_token",
"_static_token",
"_is_ugk",
"_public_key",
"_crypto",
"_username",
"_client_id",
"_client_version",
"_language",
"_ug_agent",
"_timeout",
"_verify_ssl",
)
def __init__(
self,
host: str,
client_version: str = "76363",
language: str = "zh-CN",
ug_agent: str = "PC/WEB",
timeout: int = 20,
verify_ssl: bool = True,
):
self._host = self._normalize_base_url(host)
self._session = Session()
self._token: Optional[str] = None
self._static_token: Optional[str] = None
self._is_ugk: bool = False
self._public_key: Optional[str] = None
self._crypto: Optional[UgreenCrypto] = None
self._username: Optional[str] = None
self._client_id = f"{uuid.uuid4()}-WEB"
self._client_version = client_version
self._language = language
self._ug_agent = ug_agent
self._timeout = timeout
# 是否校验证书,默认开启;仅在用户明确配置时才应关闭。
self._verify_ssl = bool(verify_ssl)
@property
def host(self) -> str:
return self._host
@property
def token(self) -> Optional[str]:
return self._token
@property
def static_token(self) -> Optional[str]:
return self._static_token
@property
def is_ugk(self) -> bool:
return self._is_ugk
@property
def public_key(self) -> Optional[str]:
return self._public_key
def close(self):
"""
关闭底层 HTTP 会话。
"""
self._session.close()
@staticmethod
def _normalize_base_url(host: str) -> str:
if not host:
return ""
host = UrlUtils.standardize_base_url(host).rstrip("/")
parsed = urlsplit(host)
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
@staticmethod
def _decode_public_key(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
value = str(raw).strip()
if not value:
return None
if "BEGIN" in value:
return value
try:
return base64.b64decode(value).decode("utf-8")
except Exception:
return None
@staticmethod
def _extract_rsa_token(resp_json: dict, headers: Mapping[str, str]) -> Optional[str]:
token = headers.get("x-rsa-token") or headers.get("X-Rsa-Token")
if token:
return token
token = resp_json.get("xRsaToken") or resp_json.get("x-rsa-token")
if token:
return token
data = resp_json.get("data") if isinstance(resp_json, Mapping) else None
if isinstance(data, Mapping):
return data.get("xRsaToken") or data.get("x-rsa-token")
return None
def _common_headers(self) -> dict[str, str]:
"""
获取绿联 Web 端通用请求头。
"""
return {
"Accept": "application/json, text/plain, */*",
"Client-Id": self._client_id,
"Client-Version": self._client_version,
"UG-Agent": self._ug_agent,
"X-Specify-Language": self._language,
}
def _request_json(
self,
url: str,
method: str = "GET",
headers: Optional[dict] = None,
params: Optional[dict] = None,
json_data: Optional[dict] = None,
) -> Optional[dict]:
"""
发送 HTTP 请求并尝试解析为 JSON。
"""
try:
method = method.upper()
if method == "POST":
resp = self._session.post(
url=url,
headers=headers,
params=params,
json=json_data,
timeout=self._timeout,
verify=self._verify_ssl,
)
else:
resp = self._session.get(
url=url,
headers=headers,
params=params,
timeout=self._timeout,
verify=self._verify_ssl,
)
return resp.json()
except Exception as err:
logger.error(f"请求绿联接口失败:{url} {err}")
return None
@staticmethod
def _build_result(payload: Any) -> ApiResult:
if not isinstance(payload, Mapping):
return ApiResult(code=-1, msg="响应格式错误", raw=None)
code = payload.get("code")
try:
code = int(code)
except Exception:
code = -1
return ApiResult(
code=code,
msg=str(payload.get("msg") or ""),
data=payload.get("data"),
debug=payload.get("debug"),
raw=dict(payload),
)
def login(self, username: str, password: str, keepalive: bool = True) -> Optional[str]:
"""
登录绿联账号并初始化加密上下文。
:param username: 用户名
:param password: 密码(会先做 RSA 分段加密)
:param keepalive: 是否保持登录
:return: 登录成功返回 token
"""
if not username or not password:
return None
headers = self._common_headers()
try:
check_resp = self._session.post(
url=f"{self._host}/ugreen/v1/verify/check",
headers=headers,
json={"username": username},
timeout=self._timeout,
verify=self._verify_ssl,
)
check_json = check_resp.json()
except Exception as err:
logger.error(f"绿联获取登录公钥失败:{err}")
return None
check_result = self._build_result(check_json)
if not check_result.success:
logger.error(f"绿联获取登录公钥失败:{check_result.msg}")
return None
rsa_token = self._extract_rsa_token(check_json, check_resp.headers)
login_public_key = self._decode_public_key(rsa_token)
if not login_public_key:
logger.error("绿联获取登录公钥失败:公钥为空")
return None
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(password)
login_json = self._request_json(
url=f"{self._host}/ugreen/v1/verify/login",
method="POST",
headers=headers,
json_data={
"username": username,
"password": encrypted_password,
"keepalive": keepalive,
"otp": True,
"is_simple": True,
},
)
if not login_json:
return None
login_result = self._build_result(login_json)
if not login_result.success or not isinstance(login_result.data, Mapping):
logger.error(f"绿联登录失败:{login_result.msg}")
return None
token = str(login_result.data.get("token") or "").strip()
public_key = self._decode_public_key(str(login_result.data.get("public_key") or ""))
if not token or not public_key:
logger.error("绿联登录失败:未返回 token/public_key")
return None
self._token = token
static_token = str(login_result.data.get("static_token") or "").strip()
self._static_token = static_token or self._token
self._is_ugk = bool(login_result.data.get("is_ugk"))
self._public_key = public_key
self._crypto = UgreenCrypto(
public_key=self._public_key,
token=self._token,
client_id=self._client_id,
client_version=self._client_version,
ug_agent=self._ug_agent,
language=self._language,
)
self._username = username
return self._token
def export_session_state(self) -> Optional[dict]:
"""
导出当前登录会话,供持久化存储使用。
"""
if not self._token or not self._public_key:
return None
return {
"token": self._token,
"static_token": self._static_token,
"is_ugk": self._is_ugk,
"public_key": self._public_key,
"username": self._username,
"client_id": self._client_id,
"client_version": self._client_version,
"language": self._language,
"ug_agent": self._ug_agent,
"cookies": self._session.cookies.get_dict(),
}
def import_session_state(self, state: Mapping[str, Any]) -> bool:
"""
从持久化数据恢复登录会话,避免重复登录。
"""
if not isinstance(state, Mapping):
return False
token = str(state.get("token") or "").strip()
public_key = self._decode_public_key(str(state.get("public_key") or ""))
if not token or not public_key:
return False
static_token = str(state.get("static_token") or "").strip()
is_ugk = bool(state.get("is_ugk"))
# 会话可能与 client_id 绑定,需恢复原客户端信息
client_id = str(state.get("client_id") or "").strip()
if client_id:
self._client_id = client_id
client_version = str(state.get("client_version") or "").strip()
if client_version:
self._client_version = client_version
language = str(state.get("language") or "").strip()
if language:
self._language = language
ug_agent = str(state.get("ug_agent") or "").strip()
if ug_agent:
self._ug_agent = ug_agent
username = str(state.get("username") or "").strip()
self._username = username or None
cookies = state.get("cookies")
if isinstance(cookies, Mapping):
try:
self._session.cookies.update(
{
str(k): str(v)
for k, v in cookies.items()
if k is not None and v is not None
}
)
except Exception:
pass
self._token = token
self._static_token = static_token or self._token
self._is_ugk = is_ugk
self._public_key = public_key
self._crypto = UgreenCrypto(
public_key=self._public_key,
token=self._token,
client_id=self._client_id,
client_version=self._client_version,
ug_agent=self._ug_agent,
language=self._language,
)
return True
def logout(self):
"""
登出并清理本地认证状态。
"""
if not self._token or not self._crypto:
return
try:
req = self._crypto.build_encrypted_request(
url=f"{self._host}/ugreen/v1/verify/logout",
method="GET",
params={},
)
self._session.get(
req.url,
headers=req.headers,
params=req.params,
timeout=self._timeout,
verify=self._verify_ssl,
)
except Exception:
pass
self._token = None
self._static_token = None
self._is_ugk = False
self._public_key = None
self._crypto = None
self._username = None
def request(
self,
path: str,
method: str = "GET",
params: Optional[dict] = None,
data: Optional[dict] = None,
) -> ApiResult:
"""
统一请求入口。
核心行为:
1. 自动把 `params` 明文序列化并加密为 `encrypt_query`
2. 自动注入绿联安全头(`X-Ugreen-*`
3. 对 `POST/PUT/PATCH` 的 JSON 体加密;
4. 自动解密 `encrypt_resp_body`。
:param path: `/ugreen/` 后的相对路径,例如 `v1/video/homepage/media_list`
:param method: HTTP 方法
:param params: 明文查询参数(无需自己处理 encrypt_query
:param data: 明文 JSON 请求体(自动加密)
"""
if not self._crypto:
return ApiResult(code=-1, msg="未登录")
api_path = path.strip("/")
# 由加密工具自动构建 encrypt_query 与加密请求体
req = self._crypto.build_encrypted_request(
url=f"{self._host}/ugreen/{api_path}",
method=method.upper(),
params=params or {},
data=data,
encrypt_body=method.upper() in {"POST", "PUT", "PATCH"},
)
payload = self._request_json(
url=req.url,
method=method,
headers=req.headers,
params=req.params,
json_data=req.json,
)
if payload is None:
return ApiResult(code=-1, msg="接口请求失败")
# 响应若包含 encrypt_resp_body这里会自动解密
decrypted = self._crypto.decrypt_response(payload, req.aes_key)
return self._build_result(decrypted)
def current_user(self) -> Optional[dict]:
"""
获取当前登录用户信息。
"""
result = self.request("v1/user/current/user")
if not result.success or not isinstance(result.data, Mapping):
return None
return dict(result.data)
def media_list(self) -> list[dict]:
"""
获取首页媒体库列表(`media_lib_info_list`)。
"""
result = self.request("v1/video/homepage/media_list")
if not result.success or not isinstance(result.data, Mapping):
return []
items = result.data.get("media_lib_info_list")
return items if isinstance(items, list) else []
def media_lib_users(self) -> list[dict]:
"""
获取媒体库用户列表。
"""
result = self.request("v1/video/media_lib/get_user_list")
if not result.success or not isinstance(result.data, Mapping):
return []
users = result.data.get("user_info_arr")
return users if isinstance(users, list) else []
def recently_played(self, page: int = 1, page_size: int = 12) -> Optional[dict]:
"""
获取继续观看列表。
"""
result = self.request(
"v1/video/recently_played/get",
params={
"page": page,
"page_size": page_size,
"language": self._language,
"create_time_order": "false",
},
)
return result.data if result.success and isinstance(result.data, Mapping) else None
def recently_updated(self, page: int = 1, page_size: int = 20) -> Optional[dict]:
"""
获取最近更新列表。
"""
result = self.request(
"v1/video/recently_update/get",
params={
"page": page,
"page_size": page_size,
"language": self._language,
"create_time_order": "false",
},
)
return result.data if result.success and isinstance(result.data, Mapping) else None
def recently_played_info(self, item_id: Union[str, int]) -> Optional[dict]:
"""
获取单个视频的播放状态与基础详情信息。
"""
result = self.request(
"v1/video/recently_played/info",
params={
"ug_video_info_id": item_id,
"version_control": "true",
},
)
if result.code in {200, 1303} and isinstance(result.data, Mapping):
return dict(result.data)
return None
def search(self, keyword: str, offset: int = 0, limit: int = 200) -> Optional[dict]:
"""
搜索媒体(电影/剧集)。
"""
result = self.request(
"v1/video/search",
params={
"language": self._language,
"search_type": 1,
"offset": offset,
"limit": limit,
"keyword": keyword,
},
)
return result.data if result.success and isinstance(result.data, Mapping) else None
def video_all(self, classification: int, page: int = 1, page_size: int = 20) -> Optional[dict]:
"""
获取 `v1/video/all` 分类列表。
常用分类:
-102: 电影
-103: 电视剧
"""
result = self.request(
"v1/video/all",
params={
"page": page,
"pageSize": page_size,
"classification": classification,
"sort_type": 2,
"order_type": 2,
"release_date_begin": -9999999999,
"release_date_end": -9999999999,
"identify_status": 0,
"watch_status": -1,
"ug_style_id": 0,
"ug_country_id": 0,
"clarity": -1,
},
)
return result.data if result.success and isinstance(result.data, Mapping) else None
def poster_wall_get_folder(
self,
path: Optional[str] = None,
page: int = 1,
page_size: int = 100,
sort_type: int = 1,
order_type: int = 1,
) -> Optional[dict]:
"""
获取海报墙文件夹与条目(可按目录路径递归展开)。
"""
params: Dict[str, Any] = {
"page": page,
"page_size": page_size,
"sort_type": sort_type,
"order_type": order_type,
}
if path:
params["path"] = path
result = self.request("v1/video/poster_wall/media_lib/get_folder", params=params)
return result.data if result.success and isinstance(result.data, Mapping) else None
def get_movie(
self,
item_id: Union[str, int],
media_lib_set_id: Union[str, int],
path: Optional[str] = None,
folder_path: Optional[str] = None,
) -> Optional[dict]:
"""
获取电影详情。
"""
params: Dict[str, Any] = {
"id": item_id,
"media_lib_set_id": media_lib_set_id,
"fileVersion": "true",
}
if path:
params["path"] = path
if folder_path:
params["folder_path"] = folder_path
result = self.request("v1/video/details/getMovie", params=params)
return result.data if result.success and isinstance(result.data, Mapping) else None
def get_tv(self, item_id: Union[str, int], folder_path: str = "ALL") -> Optional[dict]:
"""
获取剧集详情(含季/集信息)。
"""
result = self.request(
"v2/video/details/getTV",
params={
"ug_video_info_id": item_id,
"folder_path": folder_path,
},
)
return result.data if result.success and isinstance(result.data, Mapping) else None
def scan(self, media_lib_set_id: Union[str, int], scan_type: int = 2, op_type: int = 2) -> bool:
"""
触发媒体库扫描。
:param media_lib_set_id: 媒体库 ID
:param scan_type: 扫描类型1: 新添加和修改, 2: 补充缺失, 3: 覆盖扫描)
:param op_type: 操作类型(网页端常用 2
"""
result = self.request(
"v1/video/media_lib/scan",
params={
"op_type": op_type,
"media_lib_set_id": media_lib_set_id,
"media_lib_scan_type": scan_type,
},
)
return result.success
def scan_status(self, only_brief: bool = True) -> list[dict]:
"""
获取媒体库扫描状态。
"""
result = self.request(
"v1/video/media_lib/scan/status",
params={"only_brief": "true" if only_brief else "false"},
)
if not result.success or not isinstance(result.data, Mapping):
return []
arr = result.data.get("media_lib_scan_status_arr")
return arr if isinstance(arr, list) else []
def preferences_all(self) -> Optional[Any]:
"""
获取影视偏好设置(`v1/video/preferences/all`)。
"""
result = self.request("v1/video/preferences/all")
return result.data if result.success else None
def history_get(self, num: int = 10) -> Optional[Any]:
"""
获取历史记录(`v1/video/history/get`)。
"""
result = self.request("v1/video/history/get", params={"num": num})
return result.data if result.success else None
def data_source_get_config(self) -> Optional[Any]:
"""
获取数据源配置(`v1/video/data_source/get_config`)。
"""
result = self.request("v1/video/data_source/get_config")
return result.data if result.success else None
def homepage_slider(
self, language: Optional[str] = None, app_name: str = "web"
) -> Optional[Any]:
"""
获取首页轮播数据(`v1/video/homepage/slider`)。
"""
result = self.request(
"v1/video/homepage/slider",
params={
"language": language or self._language,
"app_name": app_name,
},
)
return result.data if result.success else None
def media_lib_guide_init(self) -> Optional[Any]:
"""
获取媒体库引导初始化信息(`v1/video/media_lib/guide_init`)。
"""
result = self.request("v1/video/media_lib/guide_init")
return result.data if result.success else None
def media_lib_filter_options(
self, media_type: int = 0, language: Optional[str] = None
) -> Optional[Any]:
"""
获取媒体库筛选项(`v1/video/media_lib/filter/options`)。
"""
result = self.request(
"v1/video/media_lib/filter/options",
params={
"type": media_type,
"language": language or self._language,
},
)
return result.data if result.success else None
def guide(self, guide_position: int = 1, client_type: int = 1) -> Optional[Any]:
"""
获取引导位数据(`v1/video/guide`)。
"""
result = self.request(
"v1/video/guide",
params={
"guide_position": guide_position,
"client_type": client_type,
},
)
return result.data if result.success else None
def homepage_v2(self, language: Optional[str] = None) -> Optional[Any]:
"""
获取新版首页聚合数据(`v2/video/homepage`)。
"""
result = self.request(
"v2/video/homepage",
params={"language": language or self._language},
)
return result.data if result.success else None
def media_lib_init_user_permission(self) -> Optional[Any]:
"""
初始化用户媒体库权限(`v1/video/media_lib/init_user_permission`)。
"""
result = self.request("v1/video/media_lib/init_user_permission")
return result.data if result.success else None
def media_lib_get_all(
self, req_type: int = 2, language: Optional[str] = None
) -> Optional[Any]:
"""
获取全部媒体库集合(`v1/video/media_lib/get_all`)。
"""
result = self.request(
"v1/video/media_lib/get_all",
params={
"mediaLib_get_all_req_type": req_type,
"language": language or self._language,
},
)
return result.data if result.success else None