mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-19 19:46:55 +08:00
Merge pull request #5525 from baozaodetudou/orv2
This commit is contained in:
@@ -26,11 +26,17 @@ def statistic(name: Optional[str] = None, _: schemas.TokenPayload = Depends(veri
|
||||
if media_statistics:
|
||||
# 汇总各媒体库统计信息
|
||||
ret_statistic = schemas.Statistic()
|
||||
has_episode_count = False
|
||||
for media_statistic in media_statistics:
|
||||
ret_statistic.movie_count += media_statistic.movie_count
|
||||
ret_statistic.tv_count += media_statistic.tv_count
|
||||
ret_statistic.episode_count += media_statistic.episode_count
|
||||
ret_statistic.user_count += media_statistic.user_count
|
||||
ret_statistic.movie_count += media_statistic.movie_count or 0
|
||||
ret_statistic.tv_count += media_statistic.tv_count or 0
|
||||
ret_statistic.user_count += media_statistic.user_count or 0
|
||||
if media_statistic.episode_count is not None:
|
||||
ret_statistic.episode_count += media_statistic.episode_count or 0
|
||||
has_episode_count = True
|
||||
if not has_episode_count:
|
||||
# 所有媒体服务都未提供剧集统计时,返回 None 供前端展示“未获取”。
|
||||
ret_statistic.episode_count = None
|
||||
return ret_statistic
|
||||
else:
|
||||
return schemas.Statistic()
|
||||
|
||||
358
app/modules/ugreen/__init__.py
Normal file
358
app/modules/ugreen/__init__.py
Normal file
@@ -0,0 +1,358 @@
|
||||
from typing import Any, Generator, List, Optional, Tuple, Union
|
||||
|
||||
from app import schemas
|
||||
from app.core.context import MediaInfo
|
||||
from app.core.event import eventmanager
|
||||
from app.log import logger
|
||||
from app.modules import _MediaServerBase, _ModuleBase
|
||||
from app.modules.ugreen.ugreen import Ugreen
|
||||
from app.schemas import AuthCredentials, AuthInterceptCredentials
|
||||
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType
|
||||
|
||||
|
||||
class UgreenModule(_ModuleBase, _MediaServerBase[Ugreen]):
|
||||
|
||||
def init_module(self) -> None:
|
||||
"""
|
||||
初始化模块
|
||||
"""
|
||||
super().init_service(
|
||||
service_name=Ugreen.__name__.lower(),
|
||||
service_type=lambda conf: Ugreen(
|
||||
**conf.config, sync_libraries=conf.sync_libraries
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_name() -> str:
|
||||
return "绿联影视"
|
||||
|
||||
@staticmethod
|
||||
def get_type() -> ModuleType:
|
||||
"""
|
||||
获取模块类型
|
||||
"""
|
||||
return ModuleType.MediaServer
|
||||
|
||||
@staticmethod
|
||||
def get_subtype() -> MediaServerType:
|
||||
"""
|
||||
获取模块子类型
|
||||
"""
|
||||
return MediaServerType.Ugreen
|
||||
|
||||
@staticmethod
|
||||
def get_priority() -> int:
|
||||
"""
|
||||
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
|
||||
"""
|
||||
return 5
|
||||
|
||||
def init_setting(self) -> Tuple[str, Union[str, bool]]:
|
||||
pass
|
||||
|
||||
def scheduler_job(self) -> None:
|
||||
"""
|
||||
定时任务,每10分钟调用一次
|
||||
"""
|
||||
for name, server in self.get_instances().items():
|
||||
if server.is_configured() and server.is_inactive():
|
||||
logger.info(f"绿联影视 {name} 连接断开,尝试重连 ...")
|
||||
server.reconnect()
|
||||
|
||||
def stop(self):
|
||||
for server in self.get_instances().values():
|
||||
if server.is_authenticated():
|
||||
server.disconnect()
|
||||
|
||||
def test(self) -> Optional[Tuple[bool, str]]:
|
||||
"""
|
||||
测试模块连接性
|
||||
"""
|
||||
if not self.get_instances():
|
||||
return None
|
||||
for name, server in self.get_instances().items():
|
||||
if not server.is_configured():
|
||||
return False, f"绿联影视配置不完整:{name}"
|
||||
if server.is_inactive() and not server.reconnect():
|
||||
return False, f"无法连接绿联影视:{name}"
|
||||
return True, ""
|
||||
|
||||
def user_authenticate(
|
||||
self, credentials: AuthCredentials, service_name: Optional[str] = None
|
||||
) -> Optional[AuthCredentials]:
|
||||
"""
|
||||
使用绿联影视用户辅助完成用户认证
|
||||
"""
|
||||
if not credentials or credentials.grant_type != "password":
|
||||
return None
|
||||
|
||||
if service_name:
|
||||
servers = (
|
||||
[(service_name, server)]
|
||||
if (server := self.get_instance(service_name))
|
||||
else []
|
||||
)
|
||||
else:
|
||||
servers = self.get_instances().items()
|
||||
|
||||
for name, server in servers:
|
||||
intercept_event = eventmanager.send_event(
|
||||
etype=ChainEventType.AuthIntercept,
|
||||
data=AuthInterceptCredentials(
|
||||
username=credentials.username,
|
||||
channel=self.get_name(),
|
||||
service=name,
|
||||
status="triggered",
|
||||
),
|
||||
)
|
||||
if intercept_event and intercept_event.event_data:
|
||||
intercept_data: AuthInterceptCredentials = intercept_event.event_data
|
||||
if intercept_data.cancel:
|
||||
continue
|
||||
token = server.authenticate(credentials.username, credentials.password)
|
||||
if token:
|
||||
credentials.channel = self.get_name()
|
||||
credentials.service = name
|
||||
credentials.token = token
|
||||
return credentials
|
||||
return None
|
||||
|
||||
def webhook_parser(
|
||||
self, body: Any, form: Any, args: Any
|
||||
) -> Optional[schemas.WebhookEventInfo]:
|
||||
"""
|
||||
解析Webhook报文体
|
||||
"""
|
||||
source = args.get("source")
|
||||
if source:
|
||||
server: Optional[Ugreen] = self.get_instance(source)
|
||||
if not server:
|
||||
return None
|
||||
result = server.get_webhook_message(body)
|
||||
if result:
|
||||
result.server_name = source
|
||||
return result
|
||||
|
||||
for server in self.get_instances().values():
|
||||
if server:
|
||||
result = server.get_webhook_message(body)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
def media_exists(
|
||||
self,
|
||||
mediainfo: MediaInfo,
|
||||
itemid: Optional[str] = None,
|
||||
server: Optional[str] = None,
|
||||
) -> Optional[schemas.ExistMediaInfo]:
|
||||
"""
|
||||
判断媒体文件是否存在
|
||||
"""
|
||||
if server:
|
||||
servers = [(server, self.get_instance(server))]
|
||||
else:
|
||||
servers = self.get_instances().items()
|
||||
|
||||
for name, s in servers:
|
||||
if not s:
|
||||
continue
|
||||
if mediainfo.type == MediaType.MOVIE:
|
||||
if itemid:
|
||||
movie = s.get_iteminfo(itemid)
|
||||
if movie:
|
||||
logger.info(f"媒体库 {name} 中找到了 {movie}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.MOVIE,
|
||||
server_type="ugreen",
|
||||
server=name,
|
||||
itemid=movie.item_id,
|
||||
)
|
||||
movies = s.get_movies(
|
||||
title=mediainfo.title,
|
||||
year=mediainfo.year,
|
||||
tmdb_id=mediainfo.tmdb_id,
|
||||
)
|
||||
if not movies:
|
||||
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中")
|
||||
continue
|
||||
logger.info(f"媒体库 {name} 中找到了 {movies}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.MOVIE,
|
||||
server_type="ugreen",
|
||||
server=name,
|
||||
itemid=movies[0].item_id,
|
||||
)
|
||||
|
||||
itemid, tvs = s.get_tv_episodes(
|
||||
title=mediainfo.title,
|
||||
year=mediainfo.year,
|
||||
tmdb_id=mediainfo.tmdb_id,
|
||||
item_id=itemid,
|
||||
)
|
||||
if not tvs:
|
||||
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中")
|
||||
continue
|
||||
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.TV,
|
||||
seasons=tvs,
|
||||
server_type="ugreen",
|
||||
server=name,
|
||||
itemid=itemid,
|
||||
)
|
||||
return None
|
||||
|
||||
def media_statistic(
|
||||
self, server: Optional[str] = None
|
||||
) -> Optional[List[schemas.Statistic]]:
|
||||
"""
|
||||
媒体数量统计
|
||||
"""
|
||||
if server:
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
servers = [server_obj]
|
||||
else:
|
||||
servers = self.get_instances().values()
|
||||
|
||||
media_statistics = []
|
||||
for s in servers:
|
||||
media_statistic = s.get_medias_count()
|
||||
if not media_statistic:
|
||||
continue
|
||||
media_statistic.user_count = s.get_user_count()
|
||||
media_statistics.append(media_statistic)
|
||||
return media_statistics
|
||||
|
||||
def mediaserver_librarys(
|
||||
self, server: Optional[str] = None, hidden: Optional[bool] = False, **kwargs
|
||||
) -> Optional[List[schemas.MediaServerLibrary]]:
|
||||
"""
|
||||
媒体库列表
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_librarys(hidden=hidden)
|
||||
return None
|
||||
|
||||
def mediaserver_items(
|
||||
self,
|
||||
server: str,
|
||||
library_id: Union[str, int],
|
||||
start_index: Optional[int] = 0,
|
||||
limit: Optional[int] = -1,
|
||||
) -> Optional[Generator]:
|
||||
"""
|
||||
获取媒体服务器项目列表
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_items(library_id, start_index, limit)
|
||||
return None
|
||||
|
||||
def mediaserver_iteminfo(
|
||||
self, server: str, item_id: str
|
||||
) -> Optional[schemas.MediaServerItem]:
|
||||
"""
|
||||
媒体库项目详情
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_iteminfo(item_id)
|
||||
return None
|
||||
|
||||
def mediaserver_tv_episodes(
|
||||
self, server: str, item_id: Union[str, int]
|
||||
) -> Optional[List[schemas.MediaServerSeasonInfo]]:
|
||||
"""
|
||||
获取剧集信息
|
||||
"""
|
||||
if not item_id:
|
||||
return None
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
_, seasoninfo = server_obj.get_tv_episodes(item_id=str(item_id))
|
||||
if not seasoninfo:
|
||||
return []
|
||||
return [
|
||||
schemas.MediaServerSeasonInfo(season=season, episodes=episodes)
|
||||
for season, episodes in seasoninfo.items()
|
||||
]
|
||||
|
||||
def mediaserver_playing(
|
||||
self, server: str, count: Optional[int] = 20, **kwargs
|
||||
) -> List[schemas.MediaServerPlayItem]:
|
||||
"""
|
||||
获取媒体服务器正在播放信息
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
return server_obj.get_resume(num=count) or []
|
||||
|
||||
def mediaserver_play_url(
|
||||
self, server: str, item_id: Union[str, int]
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
获取媒体库播放地址
|
||||
"""
|
||||
if not item_id:
|
||||
return None
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
return server_obj.get_play_url(str(item_id))
|
||||
|
||||
def mediaserver_latest(
|
||||
self,
|
||||
server: Optional[str] = None,
|
||||
count: Optional[int] = 20,
|
||||
**kwargs,
|
||||
) -> List[schemas.MediaServerPlayItem]:
|
||||
"""
|
||||
获取媒体服务器最新入库条目
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
return server_obj.get_latest(num=count) or []
|
||||
|
||||
def mediaserver_latest_images(
|
||||
self,
|
||||
server: Optional[str] = None,
|
||||
count: Optional[int] = 20,
|
||||
remote: Optional[bool] = False,
|
||||
**kwargs,
|
||||
) -> List[str]:
|
||||
"""
|
||||
获取媒体服务器最新入库条目的图片
|
||||
"""
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
return server_obj.get_latest_backdrops(num=count, remote=remote) or []
|
||||
|
||||
def mediaserver_image_cookies(
|
||||
self,
|
||||
server: Optional[str] = None,
|
||||
image_url: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> Optional[str | dict]:
|
||||
"""
|
||||
获取绿联影视服务器的图片Cookies
|
||||
"""
|
||||
if not image_url:
|
||||
return None
|
||||
if server:
|
||||
server_obj: Optional[Ugreen] = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
return server_obj.get_image_cookies(image_url)
|
||||
for server_obj in self.get_instances().values():
|
||||
if cookies := server_obj.get_image_cookies(image_url):
|
||||
return cookies
|
||||
return None
|
||||
746
app/modules/ugreen/api.py
Normal file
746
app/modules/ugreen/api.py
Normal file
@@ -0,0 +1,746 @@
|
||||
import base64
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Mapping, Optional, Union
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
from requests import Session
|
||||
|
||||
from app.log import logger
|
||||
from app.utils.ugreen_crypto import UgreenCrypto
|
||||
from app.utils.url import UrlUtils
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApiResult:
|
||||
code: int = -1
|
||||
msg: str = ""
|
||||
data: Any = None
|
||||
debug: Optional[str] = None
|
||||
raw: Optional[dict] = None
|
||||
|
||||
@property
|
||||
def success(self) -> bool:
|
||||
return self.code == 200
|
||||
|
||||
|
||||
class Api:
|
||||
"""
|
||||
绿联影视 API 客户端(统一加密通道)。
|
||||
|
||||
说明:
|
||||
1. 所有业务接口调用都应走 `request()`;
|
||||
2. `request()` 会自动将明文查询参数加密为 `encrypt_query`;
|
||||
3. 若响应包含 `encrypt_resp_body`,会自动完成解密后再返回。
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
"_host",
|
||||
"_session",
|
||||
"_token",
|
||||
"_static_token",
|
||||
"_is_ugk",
|
||||
"_public_key",
|
||||
"_crypto",
|
||||
"_username",
|
||||
"_client_id",
|
||||
"_client_version",
|
||||
"_language",
|
||||
"_ug_agent",
|
||||
"_timeout",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
client_version: str = "76363",
|
||||
language: str = "zh-CN",
|
||||
ug_agent: str = "PC/WEB",
|
||||
timeout: int = 20,
|
||||
):
|
||||
self._host = self._normalize_base_url(host)
|
||||
self._session = Session()
|
||||
|
||||
self._token: Optional[str] = None
|
||||
self._static_token: Optional[str] = None
|
||||
self._is_ugk: bool = False
|
||||
self._public_key: Optional[str] = None
|
||||
self._crypto: Optional[UgreenCrypto] = None
|
||||
self._username: Optional[str] = None
|
||||
|
||||
self._client_id = f"{uuid.uuid4()}-WEB"
|
||||
self._client_version = client_version
|
||||
self._language = language
|
||||
self._ug_agent = ug_agent
|
||||
self._timeout = timeout
|
||||
|
||||
@property
|
||||
def host(self) -> str:
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def token(self) -> Optional[str]:
|
||||
return self._token
|
||||
|
||||
@property
|
||||
def static_token(self) -> Optional[str]:
|
||||
return self._static_token
|
||||
|
||||
@property
|
||||
def is_ugk(self) -> bool:
|
||||
return self._is_ugk
|
||||
|
||||
@property
|
||||
def public_key(self) -> Optional[str]:
|
||||
return self._public_key
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
关闭底层 HTTP 会话。
|
||||
"""
|
||||
self._session.close()
|
||||
|
||||
@staticmethod
|
||||
def _normalize_base_url(host: str) -> str:
|
||||
if not host:
|
||||
return ""
|
||||
host = UrlUtils.standardize_base_url(host).rstrip("/")
|
||||
parsed = urlsplit(host)
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
|
||||
|
||||
@staticmethod
|
||||
def _decode_public_key(raw: Optional[str]) -> Optional[str]:
|
||||
if not raw:
|
||||
return None
|
||||
value = str(raw).strip()
|
||||
if not value:
|
||||
return None
|
||||
if "BEGIN" in value:
|
||||
return value
|
||||
try:
|
||||
return base64.b64decode(value).decode("utf-8")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_rsa_token(resp_json: dict, headers: Mapping[str, str]) -> Optional[str]:
|
||||
token = headers.get("x-rsa-token") or headers.get("X-Rsa-Token")
|
||||
if token:
|
||||
return token
|
||||
token = resp_json.get("xRsaToken") or resp_json.get("x-rsa-token")
|
||||
if token:
|
||||
return token
|
||||
data = resp_json.get("data") if isinstance(resp_json, Mapping) else None
|
||||
if isinstance(data, Mapping):
|
||||
return data.get("xRsaToken") or data.get("x-rsa-token")
|
||||
return None
|
||||
|
||||
def _common_headers(self) -> dict[str, str]:
|
||||
"""
|
||||
获取绿联 Web 端通用请求头。
|
||||
"""
|
||||
return {
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"Client-Id": self._client_id,
|
||||
"Client-Version": self._client_version,
|
||||
"UG-Agent": self._ug_agent,
|
||||
"X-Specify-Language": self._language,
|
||||
}
|
||||
|
||||
def _request_json(
|
||||
self,
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
headers: Optional[dict] = None,
|
||||
params: Optional[dict] = None,
|
||||
json_data: Optional[dict] = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
发送 HTTP 请求并尝试解析为 JSON。
|
||||
"""
|
||||
try:
|
||||
method = method.upper()
|
||||
if method == "POST":
|
||||
resp = self._session.post(
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
json=json_data,
|
||||
timeout=self._timeout,
|
||||
verify=False,
|
||||
)
|
||||
else:
|
||||
resp = self._session.get(
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
timeout=self._timeout,
|
||||
verify=False,
|
||||
)
|
||||
return resp.json()
|
||||
except Exception as err:
|
||||
logger.error(f"请求绿联接口失败:{url} {err}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _build_result(payload: Any) -> ApiResult:
|
||||
if not isinstance(payload, Mapping):
|
||||
return ApiResult(code=-1, msg="响应格式错误", raw=None)
|
||||
code = payload.get("code")
|
||||
try:
|
||||
code = int(code)
|
||||
except Exception:
|
||||
code = -1
|
||||
return ApiResult(
|
||||
code=code,
|
||||
msg=str(payload.get("msg") or ""),
|
||||
data=payload.get("data"),
|
||||
debug=payload.get("debug"),
|
||||
raw=dict(payload),
|
||||
)
|
||||
|
||||
def login(self, username: str, password: str, keepalive: bool = True) -> Optional[str]:
|
||||
"""
|
||||
登录绿联账号并初始化加密上下文。
|
||||
|
||||
:param username: 用户名
|
||||
:param password: 密码(会先做 RSA 分段加密)
|
||||
:param keepalive: 是否保持登录
|
||||
:return: 登录成功返回 token
|
||||
"""
|
||||
if not username or not password:
|
||||
return None
|
||||
|
||||
headers = self._common_headers()
|
||||
|
||||
try:
|
||||
check_resp = self._session.post(
|
||||
url=f"{self._host}/ugreen/v1/verify/check",
|
||||
headers=headers,
|
||||
json={"username": username},
|
||||
timeout=self._timeout,
|
||||
verify=False,
|
||||
)
|
||||
check_json = check_resp.json()
|
||||
except Exception as err:
|
||||
logger.error(f"绿联获取登录公钥失败:{err}")
|
||||
return None
|
||||
|
||||
check_result = self._build_result(check_json)
|
||||
if not check_result.success:
|
||||
logger.error(f"绿联获取登录公钥失败:{check_result.msg}")
|
||||
return None
|
||||
|
||||
rsa_token = self._extract_rsa_token(check_json, check_resp.headers)
|
||||
login_public_key = self._decode_public_key(rsa_token)
|
||||
if not login_public_key:
|
||||
logger.error("绿联获取登录公钥失败:公钥为空")
|
||||
return None
|
||||
|
||||
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(password)
|
||||
login_json = self._request_json(
|
||||
url=f"{self._host}/ugreen/v1/verify/login",
|
||||
method="POST",
|
||||
headers=headers,
|
||||
json_data={
|
||||
"username": username,
|
||||
"password": encrypted_password,
|
||||
"keepalive": keepalive,
|
||||
"otp": True,
|
||||
"is_simple": True,
|
||||
},
|
||||
)
|
||||
if not login_json:
|
||||
return None
|
||||
|
||||
login_result = self._build_result(login_json)
|
||||
if not login_result.success or not isinstance(login_result.data, Mapping):
|
||||
logger.error(f"绿联登录失败:{login_result.msg}")
|
||||
return None
|
||||
|
||||
token = str(login_result.data.get("token") or "").strip()
|
||||
public_key = self._decode_public_key(str(login_result.data.get("public_key") or ""))
|
||||
if not token or not public_key:
|
||||
logger.error("绿联登录失败:未返回 token/public_key")
|
||||
return None
|
||||
|
||||
self._token = token
|
||||
static_token = str(login_result.data.get("static_token") or "").strip()
|
||||
self._static_token = static_token or self._token
|
||||
self._is_ugk = bool(login_result.data.get("is_ugk"))
|
||||
self._public_key = public_key
|
||||
self._crypto = UgreenCrypto(
|
||||
public_key=self._public_key,
|
||||
token=self._token,
|
||||
client_id=self._client_id,
|
||||
client_version=self._client_version,
|
||||
ug_agent=self._ug_agent,
|
||||
language=self._language,
|
||||
)
|
||||
self._username = username
|
||||
return self._token
|
||||
|
||||
def export_session_state(self) -> Optional[dict]:
|
||||
"""
|
||||
导出当前登录会话,供持久化存储使用。
|
||||
"""
|
||||
if not self._token or not self._public_key:
|
||||
return None
|
||||
return {
|
||||
"token": self._token,
|
||||
"static_token": self._static_token,
|
||||
"is_ugk": self._is_ugk,
|
||||
"public_key": self._public_key,
|
||||
"username": self._username,
|
||||
"client_id": self._client_id,
|
||||
"client_version": self._client_version,
|
||||
"language": self._language,
|
||||
"ug_agent": self._ug_agent,
|
||||
"cookies": self._session.cookies.get_dict(),
|
||||
}
|
||||
|
||||
def import_session_state(self, state: Mapping[str, Any]) -> bool:
|
||||
"""
|
||||
从持久化数据恢复登录会话,避免重复登录。
|
||||
"""
|
||||
if not isinstance(state, Mapping):
|
||||
return False
|
||||
|
||||
token = str(state.get("token") or "").strip()
|
||||
public_key = self._decode_public_key(str(state.get("public_key") or ""))
|
||||
if not token or not public_key:
|
||||
return False
|
||||
|
||||
static_token = str(state.get("static_token") or "").strip()
|
||||
is_ugk = bool(state.get("is_ugk"))
|
||||
|
||||
# 会话可能与 client_id 绑定,需恢复原客户端信息
|
||||
client_id = str(state.get("client_id") or "").strip()
|
||||
if client_id:
|
||||
self._client_id = client_id
|
||||
|
||||
client_version = str(state.get("client_version") or "").strip()
|
||||
if client_version:
|
||||
self._client_version = client_version
|
||||
|
||||
language = str(state.get("language") or "").strip()
|
||||
if language:
|
||||
self._language = language
|
||||
|
||||
ug_agent = str(state.get("ug_agent") or "").strip()
|
||||
if ug_agent:
|
||||
self._ug_agent = ug_agent
|
||||
|
||||
username = str(state.get("username") or "").strip()
|
||||
self._username = username or None
|
||||
|
||||
cookies = state.get("cookies")
|
||||
if isinstance(cookies, Mapping):
|
||||
try:
|
||||
self._session.cookies.update(
|
||||
{
|
||||
str(k): str(v)
|
||||
for k, v in cookies.items()
|
||||
if k is not None and v is not None
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._token = token
|
||||
self._static_token = static_token or self._token
|
||||
self._is_ugk = is_ugk
|
||||
self._public_key = public_key
|
||||
self._crypto = UgreenCrypto(
|
||||
public_key=self._public_key,
|
||||
token=self._token,
|
||||
client_id=self._client_id,
|
||||
client_version=self._client_version,
|
||||
ug_agent=self._ug_agent,
|
||||
language=self._language,
|
||||
)
|
||||
return True
|
||||
|
||||
def logout(self):
|
||||
"""
|
||||
登出并清理本地认证状态。
|
||||
"""
|
||||
if not self._token or not self._crypto:
|
||||
return
|
||||
try:
|
||||
req = self._crypto.build_encrypted_request(
|
||||
url=f"{self._host}/ugreen/v1/verify/logout",
|
||||
method="GET",
|
||||
params={},
|
||||
)
|
||||
self._session.get(
|
||||
req.url,
|
||||
headers=req.headers,
|
||||
params=req.params,
|
||||
timeout=self._timeout,
|
||||
verify=False,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
self._token = None
|
||||
self._static_token = None
|
||||
self._is_ugk = False
|
||||
self._public_key = None
|
||||
self._crypto = None
|
||||
self._username = None
|
||||
|
||||
def request(
|
||||
self,
|
||||
path: str,
|
||||
method: str = "GET",
|
||||
params: Optional[dict] = None,
|
||||
data: Optional[dict] = None,
|
||||
) -> ApiResult:
|
||||
"""
|
||||
统一请求入口。
|
||||
|
||||
核心行为:
|
||||
1. 自动把 `params` 明文序列化并加密为 `encrypt_query`;
|
||||
2. 自动注入绿联安全头(`X-Ugreen-*`);
|
||||
3. 对 `POST/PUT/PATCH` 的 JSON 体加密;
|
||||
4. 自动解密 `encrypt_resp_body`。
|
||||
|
||||
:param path: `/ugreen/` 后的相对路径,例如 `v1/video/homepage/media_list`
|
||||
:param method: HTTP 方法
|
||||
:param params: 明文查询参数(无需自己处理 encrypt_query)
|
||||
:param data: 明文 JSON 请求体(自动加密)
|
||||
"""
|
||||
if not self._crypto:
|
||||
return ApiResult(code=-1, msg="未登录")
|
||||
|
||||
api_path = path.strip("/")
|
||||
# 由加密工具自动构建 encrypt_query 与加密请求体
|
||||
req = self._crypto.build_encrypted_request(
|
||||
url=f"{self._host}/ugreen/{api_path}",
|
||||
method=method.upper(),
|
||||
params=params or {},
|
||||
data=data,
|
||||
encrypt_body=method.upper() in {"POST", "PUT", "PATCH"},
|
||||
)
|
||||
|
||||
payload = self._request_json(
|
||||
url=req.url,
|
||||
method=method,
|
||||
headers=req.headers,
|
||||
params=req.params,
|
||||
json_data=req.json,
|
||||
)
|
||||
if payload is None:
|
||||
return ApiResult(code=-1, msg="接口请求失败")
|
||||
|
||||
# 响应若包含 encrypt_resp_body,这里会自动解密
|
||||
decrypted = self._crypto.decrypt_response(payload, req.aes_key)
|
||||
return self._build_result(decrypted)
|
||||
|
||||
def current_user(self) -> Optional[dict]:
|
||||
"""
|
||||
获取当前登录用户信息。
|
||||
"""
|
||||
result = self.request("v1/user/current/user")
|
||||
if not result.success or not isinstance(result.data, Mapping):
|
||||
return None
|
||||
return dict(result.data)
|
||||
|
||||
def media_list(self) -> list[dict]:
|
||||
"""
|
||||
获取首页媒体库列表(`media_lib_info_list`)。
|
||||
"""
|
||||
result = self.request("v1/video/homepage/media_list")
|
||||
if not result.success or not isinstance(result.data, Mapping):
|
||||
return []
|
||||
items = result.data.get("media_lib_info_list")
|
||||
return items if isinstance(items, list) else []
|
||||
|
||||
def media_lib_users(self) -> list[dict]:
|
||||
"""
|
||||
获取媒体库用户列表。
|
||||
"""
|
||||
result = self.request("v1/video/media_lib/get_user_list")
|
||||
if not result.success or not isinstance(result.data, Mapping):
|
||||
return []
|
||||
users = result.data.get("user_info_arr")
|
||||
return users if isinstance(users, list) else []
|
||||
|
||||
def recently_played(self, page: int = 1, page_size: int = 12) -> Optional[dict]:
|
||||
"""
|
||||
获取继续观看列表。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/recently_played/get",
|
||||
params={
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"language": self._language,
|
||||
"create_time_order": "false",
|
||||
},
|
||||
)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def recently_updated(self, page: int = 1, page_size: int = 20) -> Optional[dict]:
|
||||
"""
|
||||
获取最近更新列表。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/recently_update/get",
|
||||
params={
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"language": self._language,
|
||||
"create_time_order": "false",
|
||||
},
|
||||
)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def recently_played_info(self, item_id: Union[str, int]) -> Optional[dict]:
|
||||
"""
|
||||
获取单个视频的播放状态与基础详情信息。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/recently_played/info",
|
||||
params={
|
||||
"ug_video_info_id": item_id,
|
||||
"version_control": "true",
|
||||
},
|
||||
)
|
||||
if result.code in {200, 1303} and isinstance(result.data, Mapping):
|
||||
return dict(result.data)
|
||||
return None
|
||||
|
||||
def search(self, keyword: str, offset: int = 0, limit: int = 200) -> Optional[dict]:
|
||||
"""
|
||||
搜索媒体(电影/剧集)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/search",
|
||||
params={
|
||||
"language": self._language,
|
||||
"search_type": 1,
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
"keyword": keyword,
|
||||
},
|
||||
)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def video_all(self, classification: int, page: int = 1, page_size: int = 20) -> Optional[dict]:
|
||||
"""
|
||||
获取 `v1/video/all` 分类列表。
|
||||
|
||||
常用分类:
|
||||
-102: 电影
|
||||
-103: 电视剧
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/all",
|
||||
params={
|
||||
"page": page,
|
||||
"pageSize": page_size,
|
||||
"classification": classification,
|
||||
"sort_type": 2,
|
||||
"order_type": 2,
|
||||
"release_date_begin": -9999999999,
|
||||
"release_date_end": -9999999999,
|
||||
"identify_status": 0,
|
||||
"watch_status": -1,
|
||||
"ug_style_id": 0,
|
||||
"ug_country_id": 0,
|
||||
"clarity": -1,
|
||||
},
|
||||
)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def poster_wall_get_folder(
|
||||
self,
|
||||
path: Optional[str] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 100,
|
||||
sort_type: int = 1,
|
||||
order_type: int = 1,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
获取海报墙文件夹与条目(可按目录路径递归展开)。
|
||||
"""
|
||||
params: Dict[str, Any] = {
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"sort_type": sort_type,
|
||||
"order_type": order_type,
|
||||
}
|
||||
if path:
|
||||
params["path"] = path
|
||||
result = self.request("v1/video/poster_wall/media_lib/get_folder", params=params)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def get_movie(
|
||||
self,
|
||||
item_id: Union[str, int],
|
||||
media_lib_set_id: Union[str, int],
|
||||
path: Optional[str] = None,
|
||||
folder_path: Optional[str] = None,
|
||||
) -> Optional[dict]:
|
||||
"""
|
||||
获取电影详情。
|
||||
"""
|
||||
params: Dict[str, Any] = {
|
||||
"id": item_id,
|
||||
"media_lib_set_id": media_lib_set_id,
|
||||
"fileVersion": "true",
|
||||
}
|
||||
if path:
|
||||
params["path"] = path
|
||||
if folder_path:
|
||||
params["folder_path"] = folder_path
|
||||
result = self.request("v1/video/details/getMovie", params=params)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def get_tv(self, item_id: Union[str, int], folder_path: str = "ALL") -> Optional[dict]:
|
||||
"""
|
||||
获取剧集详情(含季/集信息)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v2/video/details/getTV",
|
||||
params={
|
||||
"ug_video_info_id": item_id,
|
||||
"folder_path": folder_path,
|
||||
},
|
||||
)
|
||||
return result.data if result.success and isinstance(result.data, Mapping) else None
|
||||
|
||||
def scan(self, media_lib_set_id: Union[str, int], scan_type: int = 2, op_type: int = 2) -> bool:
|
||||
"""
|
||||
触发媒体库扫描。
|
||||
|
||||
:param media_lib_set_id: 媒体库 ID
|
||||
:param scan_type: 扫描类型(1: 新添加和修改, 2: 补充缺失, 3: 覆盖扫描)
|
||||
:param op_type: 操作类型(网页端常用 2)
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/media_lib/scan",
|
||||
params={
|
||||
"op_type": op_type,
|
||||
"media_lib_set_id": media_lib_set_id,
|
||||
"media_lib_scan_type": scan_type,
|
||||
},
|
||||
)
|
||||
return result.success
|
||||
|
||||
def scan_status(self, only_brief: bool = True) -> list[dict]:
|
||||
"""
|
||||
获取媒体库扫描状态。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/media_lib/scan/status",
|
||||
params={"only_brief": "true" if only_brief else "false"},
|
||||
)
|
||||
if not result.success or not isinstance(result.data, Mapping):
|
||||
return []
|
||||
arr = result.data.get("media_lib_scan_status_arr")
|
||||
return arr if isinstance(arr, list) else []
|
||||
|
||||
def preferences_all(self) -> Optional[Any]:
|
||||
"""
|
||||
获取影视偏好设置(`v1/video/preferences/all`)。
|
||||
"""
|
||||
result = self.request("v1/video/preferences/all")
|
||||
return result.data if result.success else None
|
||||
|
||||
def history_get(self, num: int = 10) -> Optional[Any]:
|
||||
"""
|
||||
获取历史记录(`v1/video/history/get`)。
|
||||
"""
|
||||
result = self.request("v1/video/history/get", params={"num": num})
|
||||
return result.data if result.success else None
|
||||
|
||||
def data_source_get_config(self) -> Optional[Any]:
|
||||
"""
|
||||
获取数据源配置(`v1/video/data_source/get_config`)。
|
||||
"""
|
||||
result = self.request("v1/video/data_source/get_config")
|
||||
return result.data if result.success else None
|
||||
|
||||
def homepage_slider(
|
||||
self, language: Optional[str] = None, app_name: str = "web"
|
||||
) -> Optional[Any]:
|
||||
"""
|
||||
获取首页轮播数据(`v1/video/homepage/slider`)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/homepage/slider",
|
||||
params={
|
||||
"language": language or self._language,
|
||||
"app_name": app_name,
|
||||
},
|
||||
)
|
||||
return result.data if result.success else None
|
||||
|
||||
def media_lib_guide_init(self) -> Optional[Any]:
|
||||
"""
|
||||
获取媒体库引导初始化信息(`v1/video/media_lib/guide_init`)。
|
||||
"""
|
||||
result = self.request("v1/video/media_lib/guide_init")
|
||||
return result.data if result.success else None
|
||||
|
||||
def media_lib_filter_options(
|
||||
self, media_type: int = 0, language: Optional[str] = None
|
||||
) -> Optional[Any]:
|
||||
"""
|
||||
获取媒体库筛选项(`v1/video/media_lib/filter/options`)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/media_lib/filter/options",
|
||||
params={
|
||||
"type": media_type,
|
||||
"language": language or self._language,
|
||||
},
|
||||
)
|
||||
return result.data if result.success else None
|
||||
|
||||
def guide(self, guide_position: int = 1, client_type: int = 1) -> Optional[Any]:
|
||||
"""
|
||||
获取引导位数据(`v1/video/guide`)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/guide",
|
||||
params={
|
||||
"guide_position": guide_position,
|
||||
"client_type": client_type,
|
||||
},
|
||||
)
|
||||
return result.data if result.success else None
|
||||
|
||||
def homepage_v2(self, language: Optional[str] = None) -> Optional[Any]:
|
||||
"""
|
||||
获取新版首页聚合数据(`v2/video/homepage`)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v2/video/homepage",
|
||||
params={"language": language or self._language},
|
||||
)
|
||||
return result.data if result.success else None
|
||||
|
||||
def media_lib_init_user_permission(self) -> Optional[Any]:
|
||||
"""
|
||||
初始化用户媒体库权限(`v1/video/media_lib/init_user_permission`)。
|
||||
"""
|
||||
result = self.request("v1/video/media_lib/init_user_permission")
|
||||
return result.data if result.success else None
|
||||
|
||||
def media_lib_get_all(
|
||||
self, req_type: int = 2, language: Optional[str] = None
|
||||
) -> Optional[Any]:
|
||||
"""
|
||||
获取全部媒体库集合(`v1/video/media_lib/get_all`)。
|
||||
"""
|
||||
result = self.request(
|
||||
"v1/video/media_lib/get_all",
|
||||
params={
|
||||
"mediaLib_get_all_req_type": req_type,
|
||||
"language": language or self._language,
|
||||
},
|
||||
)
|
||||
return result.data if result.success else None
|
||||
945
app/modules/ugreen/ugreen.py
Normal file
945
app/modules/ugreen/ugreen.py
Normal file
@@ -0,0 +1,945 @@
|
||||
import hashlib
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Generator, List, Mapping, Optional, Union
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from app import schemas
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.log import logger
|
||||
from app.modules.ugreen.api import Api
|
||||
from app.schemas import MediaType
|
||||
from app.schemas.types import SystemConfigKey
|
||||
from app.utils.url import UrlUtils
|
||||
|
||||
|
||||
class Ugreen:
|
||||
_username: Optional[str] = None
|
||||
_password: Optional[str] = None
|
||||
|
||||
_userinfo: Optional[dict] = None
|
||||
_host: Optional[str] = None
|
||||
_playhost: Optional[str] = None
|
||||
|
||||
_libraries: dict[str, dict] = {}
|
||||
_library_paths: dict[str, str] = {}
|
||||
_sync_libraries: List[str] = []
|
||||
_scan_type: int = 2
|
||||
|
||||
_api: Optional[Api] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
play_host: Optional[str] = None,
|
||||
sync_libraries: Optional[list] = None,
|
||||
scan_mode: Optional[Union[str, int]] = None,
|
||||
scan_type: Optional[Union[str, int]] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if not host or not username or not password:
|
||||
logger.error("绿联影视配置不完整!!")
|
||||
return
|
||||
|
||||
self._host = host
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._sync_libraries = sync_libraries or []
|
||||
# 绿联媒体库扫描模式:
|
||||
# 1 新添加和修改、2 补充缺失、3 覆盖扫描
|
||||
self._scan_type = self.__resolve_scan_type(scan_mode=scan_mode, scan_type=scan_type)
|
||||
|
||||
if play_host:
|
||||
self._playhost = UrlUtils.standardize_base_url(play_host).rstrip("/")
|
||||
|
||||
if not self.reconnect():
|
||||
logger.error(f"请检查服务端地址 {host}")
|
||||
|
||||
@property
|
||||
def api(self) -> Optional[Api]:
|
||||
return self._api
|
||||
|
||||
def close(self):
|
||||
self.disconnect()
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
return bool(self._host and self._username and self._password)
|
||||
|
||||
def is_authenticated(self) -> bool:
|
||||
return (
|
||||
self.is_configured()
|
||||
and self._api is not None
|
||||
and self._api.token is not None
|
||||
and self._userinfo is not None
|
||||
)
|
||||
|
||||
def is_inactive(self) -> bool:
|
||||
if not self.is_authenticated():
|
||||
return True
|
||||
self._userinfo = self._api.current_user() if self._api else None
|
||||
return self._userinfo is None
|
||||
|
||||
def __session_cache_key(self) -> str:
|
||||
"""
|
||||
生成当前绿联实例的会话缓存键(基于 host + username)。
|
||||
"""
|
||||
normalized_host = UrlUtils.standardize_base_url(self._host or "").rstrip("/").lower()
|
||||
username = (self._username or "").strip().lower()
|
||||
raw = f"{normalized_host}|{username}"
|
||||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||||
|
||||
def __password_digest(self) -> str:
|
||||
"""
|
||||
存储密码摘要用于检测配置是否变更,避免明文落盘。
|
||||
"""
|
||||
return hashlib.sha256((self._password or "").encode("utf-8")).hexdigest()
|
||||
|
||||
@staticmethod
|
||||
def __load_all_session_cache() -> dict:
|
||||
sessions = SystemConfigOper().get(SystemConfigKey.UgreenSessionCache)
|
||||
return sessions if isinstance(sessions, dict) else {}
|
||||
|
||||
@staticmethod
|
||||
def __save_all_session_cache(sessions: dict):
|
||||
SystemConfigOper().set(SystemConfigKey.UgreenSessionCache, sessions)
|
||||
|
||||
def __remove_persisted_session(self):
|
||||
cache_key = self.__session_cache_key()
|
||||
sessions = self.__load_all_session_cache()
|
||||
if cache_key in sessions:
|
||||
sessions.pop(cache_key, None)
|
||||
self.__save_all_session_cache(sessions)
|
||||
|
||||
def __save_persisted_session(self):
|
||||
if not self._api:
|
||||
return
|
||||
session_state = self._api.export_session_state()
|
||||
if not session_state:
|
||||
return
|
||||
|
||||
sessions = self.__load_all_session_cache()
|
||||
cache_key = self.__session_cache_key()
|
||||
sessions[cache_key] = {
|
||||
**session_state,
|
||||
"host": UrlUtils.standardize_base_url(self._host or "").rstrip("/"),
|
||||
"username": self._username,
|
||||
"password_digest": self.__password_digest(),
|
||||
"updated_at": int(datetime.now().timestamp()),
|
||||
}
|
||||
self.__save_all_session_cache(sessions)
|
||||
|
||||
def __restore_persisted_session(self) -> bool:
|
||||
cache_key = self.__session_cache_key()
|
||||
sessions = self.__load_all_session_cache()
|
||||
cached = sessions.get(cache_key)
|
||||
if not isinstance(cached, Mapping):
|
||||
return False
|
||||
|
||||
# 配置变更(尤其密码变更)后,不复用旧会话
|
||||
if cached.get("password_digest") != self.__password_digest():
|
||||
logger.info(f"绿联影视 {self._username} 检测到密码变更,清理旧会话缓存")
|
||||
self.__remove_persisted_session()
|
||||
return False
|
||||
|
||||
api = Api(host=self._host)
|
||||
if not api.import_session_state(cached):
|
||||
api.close()
|
||||
self.__remove_persisted_session()
|
||||
return False
|
||||
|
||||
userinfo = api.current_user()
|
||||
if not userinfo:
|
||||
# 会话失效,清理缓存并走正常登录
|
||||
api.close()
|
||||
self.__remove_persisted_session()
|
||||
logger.info(f"绿联影视 {self._username} 持久化会话已失效,准备重新登录")
|
||||
return False
|
||||
|
||||
self._api = api
|
||||
self._userinfo = userinfo
|
||||
logger.debug(f"{self._username} 已复用绿联影视持久化会话")
|
||||
return True
|
||||
|
||||
def reconnect(self) -> bool:
|
||||
if not self.is_configured():
|
||||
return False
|
||||
|
||||
# 关闭旧连接(不主动登出,避免破坏可复用会话)
|
||||
self.disconnect(logout=False)
|
||||
|
||||
if self.__restore_persisted_session():
|
||||
self.get_librarys()
|
||||
return True
|
||||
|
||||
self._api = Api(host=self._host)
|
||||
if self._api.login(self._username, self._password) is None:
|
||||
self.__remove_persisted_session()
|
||||
return False
|
||||
|
||||
self._userinfo = self._api.current_user()
|
||||
if not self._userinfo:
|
||||
self.__remove_persisted_session()
|
||||
return False
|
||||
|
||||
# 登录成功后持久化参数,下次优先复用
|
||||
self.__save_persisted_session()
|
||||
logger.debug(f"{self._username} 成功登录绿联影视")
|
||||
self.get_librarys()
|
||||
return True
|
||||
|
||||
def disconnect(self, logout: bool = False):
|
||||
if self._api:
|
||||
if logout:
|
||||
# 显式登出时同步清理本地缓存
|
||||
self._api.logout()
|
||||
self.__remove_persisted_session()
|
||||
self._api.close()
|
||||
self._api = None
|
||||
self._userinfo = None
|
||||
logger.debug(f"{self._username} 已断开绿联影视")
|
||||
|
||||
@staticmethod
|
||||
def __normalize_dir_path(path: Union[str, Path, None]) -> str:
|
||||
if path is None:
|
||||
return ""
|
||||
value = str(path).replace("\\", "/").rstrip("/")
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def __is_subpath(path: Union[str, Path, None], parent: Union[str, Path, None]) -> bool:
|
||||
path_str = Ugreen.__normalize_dir_path(path)
|
||||
parent_str = Ugreen.__normalize_dir_path(parent)
|
||||
if not path_str or not parent_str:
|
||||
return False
|
||||
return path_str == parent_str or path_str.startswith(parent_str + "/")
|
||||
|
||||
def __build_image_stream_url(self, source_url: str, size: int = 1) -> Optional[str]:
|
||||
"""
|
||||
通过绿联 getImaStream 中转图片,规避 scraper.ugnas.com 403 问题。
|
||||
"""
|
||||
if not self._api:
|
||||
return None
|
||||
|
||||
auth_token = self._api.static_token or self._api.token
|
||||
if not auth_token:
|
||||
return None
|
||||
|
||||
params = {
|
||||
"app_name": "web",
|
||||
"name": source_url,
|
||||
"size": size,
|
||||
}
|
||||
if self._api.is_ugk:
|
||||
params["ugk"] = auth_token
|
||||
else:
|
||||
params["token"] = auth_token
|
||||
|
||||
return UrlUtils.combine_url(
|
||||
host=self._api.host,
|
||||
path="/ugreen/v2/video/getImaStream",
|
||||
query=params,
|
||||
)
|
||||
|
||||
def __resolve_image(self, path: Optional[str]) -> Optional[str]:
|
||||
if not path:
|
||||
return None
|
||||
if path.startswith("http://") or path.startswith("https://"):
|
||||
parsed = urlparse(path)
|
||||
if parsed.netloc.lower() == "scraper.ugnas.com":
|
||||
# scraper 链接优先改为本机 getImaStream,避免签名过期导致 403
|
||||
if image_stream_url := self.__build_image_stream_url(path):
|
||||
return image_stream_url
|
||||
|
||||
# 绿联返回的 scraper.ugnas.com 图片常带 auth_key 时效签名,
|
||||
# 过期后会直接 403。这里提前过滤,避免前端出现裂图。
|
||||
if self.__is_expired_signed_image(path):
|
||||
return None
|
||||
return path
|
||||
# 绿联本地图片路径需要额外鉴权头,MP图片代理当前仅支持Cookie,故先忽略本地路径。
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __is_expired_signed_image(url: str) -> bool:
|
||||
"""
|
||||
判断绿联 scraper 签名图是否已过期。
|
||||
|
||||
auth_key 结构通常为:
|
||||
`{过期时间戳}-{随机串}-...`
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
if parsed.netloc.lower() != "scraper.ugnas.com":
|
||||
return False
|
||||
auth_key = parse_qs(parsed.query).get("auth_key", [None])[0]
|
||||
if not auth_key:
|
||||
return False
|
||||
expire_part = str(auth_key).split("-", 1)[0]
|
||||
expire_ts = int(expire_part)
|
||||
now_ts = int(datetime.now().timestamp())
|
||||
return expire_ts <= now_ts
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def __parse_year(video_info: dict) -> Optional[Union[str, int]]:
|
||||
year = video_info.get("year")
|
||||
if isinstance(year, int) and year > 0:
|
||||
return year
|
||||
release_date = video_info.get("release_date")
|
||||
if isinstance(release_date, (int, float)) and release_date > 0:
|
||||
try:
|
||||
return datetime.fromtimestamp(release_date).year
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __map_item_type(video_type: Any) -> Optional[str]:
|
||||
if video_type == 2:
|
||||
return "Series"
|
||||
if video_type == 1:
|
||||
return "Movie"
|
||||
if video_type == 3:
|
||||
return "Collection"
|
||||
if video_type == 0:
|
||||
return "Folder"
|
||||
return "Video"
|
||||
|
||||
@staticmethod
|
||||
def __build_media_server_item(video_info: dict, play_status: Optional[dict] = None):
|
||||
user_state = schemas.MediaServerItemUserState()
|
||||
if isinstance(play_status, dict):
|
||||
progress = play_status.get("progress")
|
||||
watch_status = play_status.get("watch_status")
|
||||
if watch_status == 2:
|
||||
user_state.played = True
|
||||
if isinstance(progress, (int, float)) and progress > 0:
|
||||
user_state.resume = progress < 1
|
||||
user_state.percentage = progress * 100.0
|
||||
last_play_time = play_status.get("last_access_time") or play_status.get("LastPlayTime")
|
||||
if isinstance(last_play_time, (int, float)) and last_play_time > 0:
|
||||
user_state.last_played_date = str(int(last_play_time))
|
||||
|
||||
tmdb_id = video_info.get("tmdb_id")
|
||||
if not isinstance(tmdb_id, int) or tmdb_id <= 0:
|
||||
tmdb_id = None
|
||||
|
||||
item_id = video_info.get("ug_video_info_id")
|
||||
if item_id is None:
|
||||
return None
|
||||
|
||||
return schemas.MediaServerItem(
|
||||
server="ugreen",
|
||||
library=video_info.get("media_lib_set_id"),
|
||||
item_id=str(item_id),
|
||||
item_type=Ugreen.__map_item_type(video_info.get("type")),
|
||||
title=video_info.get("name"),
|
||||
original_title=video_info.get("original_name"),
|
||||
year=Ugreen.__parse_year(video_info),
|
||||
tmdbid=tmdb_id,
|
||||
user_state=user_state,
|
||||
)
|
||||
|
||||
def __build_root_url(self) -> str:
|
||||
"""
|
||||
统一返回 NAS Web 根地址作为跳转链接,避免失效深链。
|
||||
"""
|
||||
host = self._playhost or (self._api.host if self._api else "")
|
||||
if not host:
|
||||
return ""
|
||||
return f"{host.rstrip('/')}/"
|
||||
|
||||
def __build_play_url(self, item_id: Union[str, int], video_type: Any, media_lib_set_id: Any) -> str:
|
||||
# 绿联深链在部分版本会失效,统一回落到 NAS 根地址。
|
||||
return self.__build_root_url()
|
||||
|
||||
def __build_play_item_from_wrapper(self, wrapper: dict) -> Optional[schemas.MediaServerPlayItem]:
|
||||
video_info = wrapper.get("video_info") if isinstance(wrapper.get("video_info"), dict) else wrapper
|
||||
if not isinstance(video_info, dict):
|
||||
return None
|
||||
|
||||
item_id = video_info.get("ug_video_info_id")
|
||||
if item_id is None:
|
||||
return None
|
||||
|
||||
play_status = wrapper.get("play_status") if isinstance(wrapper.get("play_status"), dict) else {}
|
||||
progress = play_status.get("progress") if isinstance(play_status.get("progress"), (int, float)) else 0
|
||||
|
||||
if video_info.get("type") == 2:
|
||||
subtitle = play_status.get("tv_name") or "剧集"
|
||||
media_type = MediaType.TV.value
|
||||
else:
|
||||
subtitle = "电影" if video_info.get("type") == 1 else "视频"
|
||||
media_type = MediaType.MOVIE.value
|
||||
|
||||
image = self.__resolve_image(video_info.get("poster_path")) or self.__resolve_image(
|
||||
video_info.get("backdrop_path")
|
||||
)
|
||||
|
||||
return schemas.MediaServerPlayItem(
|
||||
id=str(item_id),
|
||||
title=video_info.get("name"),
|
||||
subtitle=subtitle,
|
||||
type=media_type,
|
||||
image=image,
|
||||
link=self.__build_play_url(item_id, video_info.get("type"), video_info.get("media_lib_set_id")),
|
||||
percent=max(0.0, min(100.0, progress * 100.0)),
|
||||
server_type="ugreen",
|
||||
use_cookies=False,
|
||||
)
|
||||
|
||||
def __infer_library_type(self, name: str, path: Optional[str]) -> str:
|
||||
name = name or ""
|
||||
path = path or ""
|
||||
if "电视剧" in path or any(key in name for key in ["剧", "综艺", "动漫", "纪录片"]):
|
||||
return MediaType.TV.value
|
||||
if "电影" in path or "电影" in name:
|
||||
return MediaType.MOVIE.value
|
||||
return MediaType.UNKNOWN.value
|
||||
|
||||
def __is_library_blocked(self, library_id: str) -> bool:
|
||||
return (
|
||||
True
|
||||
if (
|
||||
self._sync_libraries
|
||||
and "all" not in self._sync_libraries
|
||||
and str(library_id) not in self._sync_libraries
|
||||
)
|
||||
else False
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def __resolve_scan_type(
|
||||
scan_mode: Optional[Union[str, int]] = None,
|
||||
scan_type: Optional[Union[str, int]] = None,
|
||||
) -> int:
|
||||
"""
|
||||
解析绿联扫描模式并转为 `media_lib_scan_type`。
|
||||
|
||||
支持值:
|
||||
- 1 / new_and_modified: 新添加和修改
|
||||
- 2 / supplement_missing: 补充缺失
|
||||
- 3 / full_override: 覆盖扫描
|
||||
"""
|
||||
# 优先使用显式 scan_type 数值配置。
|
||||
for value in (scan_type, scan_mode):
|
||||
try:
|
||||
parsed = int(value) # type: ignore[arg-type]
|
||||
if parsed in (1, 2, 3):
|
||||
return parsed
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
mode = str(scan_mode or "").strip().lower()
|
||||
mode_map = {
|
||||
"new_and_modified": 1,
|
||||
"new_modified": 1,
|
||||
"add": 1,
|
||||
"added": 1,
|
||||
"new": 1,
|
||||
"scan_new_modified": 1,
|
||||
"supplement_missing": 2,
|
||||
"supplement": 2,
|
||||
"additional": 2,
|
||||
"missing": 2,
|
||||
"scan_missing": 2,
|
||||
"full_override": 3,
|
||||
"override": 3,
|
||||
"cover": 3,
|
||||
"replace": 3,
|
||||
"scan_override": 3,
|
||||
}
|
||||
return mode_map.get(mode, 2)
|
||||
|
||||
def __scan_library(self, library_id: str, scan_type: Optional[int] = None) -> bool:
|
||||
if not self._api:
|
||||
return False
|
||||
return self._api.scan(
|
||||
media_lib_set_id=library_id,
|
||||
scan_type=scan_type or self._scan_type,
|
||||
op_type=2,
|
||||
)
|
||||
|
||||
def __load_library_paths(self) -> dict[str, str]:
|
||||
if not self._api:
|
||||
return {}
|
||||
|
||||
paths: dict[str, str] = {}
|
||||
page = 1
|
||||
while True:
|
||||
data = self._api.poster_wall_get_folder(page=page, page_size=100)
|
||||
if not data:
|
||||
break
|
||||
|
||||
for folder in data.get("folder_arr") or []:
|
||||
lib_id = folder.get("media_lib_set_id")
|
||||
lib_path = folder.get("path")
|
||||
if lib_id is not None and lib_path:
|
||||
paths[str(lib_id)] = str(lib_path)
|
||||
|
||||
if data.get("is_last_page") is True:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return paths
|
||||
|
||||
def get_librarys(self, hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return []
|
||||
|
||||
media_libs = self._api.media_list()
|
||||
self._library_paths = self.__load_library_paths()
|
||||
libraries = []
|
||||
self._libraries = {}
|
||||
|
||||
for lib in media_libs:
|
||||
lib_id = str(lib.get("media_lib_set_id"))
|
||||
if hidden and self.__is_library_blocked(lib_id):
|
||||
continue
|
||||
|
||||
lib_name = lib.get("media_name") or ""
|
||||
lib_path = self._library_paths.get(lib_id)
|
||||
library_type = self.__infer_library_type(lib_name, lib_path)
|
||||
|
||||
poster_paths = lib.get("poster_paths") or []
|
||||
backdrop_paths = lib.get("backdrop_paths") or []
|
||||
image_list = list(
|
||||
filter(
|
||||
None,
|
||||
[self.__resolve_image(p) for p in [*poster_paths, *backdrop_paths]],
|
||||
)
|
||||
)
|
||||
|
||||
self._libraries[lib_id] = {
|
||||
"id": lib_id,
|
||||
"name": lib_name,
|
||||
"path": lib_path,
|
||||
"type": library_type,
|
||||
"video_count": lib.get("video_count") or 0,
|
||||
}
|
||||
|
||||
libraries.append(
|
||||
schemas.MediaServerLibrary(
|
||||
server="ugreen",
|
||||
id=lib_id,
|
||||
name=lib_name,
|
||||
type=library_type,
|
||||
path=lib_path,
|
||||
image_list=image_list,
|
||||
link=self.__build_root_url(),
|
||||
server_type="ugreen",
|
||||
use_cookies=False,
|
||||
)
|
||||
)
|
||||
|
||||
return libraries
|
||||
|
||||
def get_user_count(self) -> int:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return 0
|
||||
users = self._api.media_lib_users()
|
||||
return len(users)
|
||||
|
||||
def get_medias_count(self) -> schemas.Statistic:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return schemas.Statistic()
|
||||
|
||||
movie_data = self._api.video_all(classification=-102, page=1, page_size=1) or {}
|
||||
tv_data = self._api.video_all(classification=-103, page=1, page_size=1) or {}
|
||||
|
||||
return schemas.Statistic(
|
||||
movie_count=int(movie_data.get("total_num") or 0),
|
||||
tv_count=int(tv_data.get("total_num") or 0),
|
||||
# 绿联当前不统计剧集总数,返回 None 由前端展示“未获取”。
|
||||
episode_count=None,
|
||||
)
|
||||
|
||||
def authenticate(self, username: str, password: str) -> Optional[str]:
|
||||
if not username or not password or not self._host:
|
||||
return None
|
||||
|
||||
api = Api(self._host)
|
||||
try:
|
||||
return api.login(username, password)
|
||||
finally:
|
||||
api.logout()
|
||||
api.close()
|
||||
|
||||
def __extract_video_info_list(self, bucket: Any) -> list[dict]:
|
||||
if not isinstance(bucket, Mapping):
|
||||
return []
|
||||
video_arr = bucket.get("video_arr")
|
||||
if not isinstance(video_arr, list):
|
||||
return []
|
||||
result = []
|
||||
for item in video_arr:
|
||||
if not isinstance(item, Mapping):
|
||||
continue
|
||||
info = item.get("video_info")
|
||||
if isinstance(info, Mapping):
|
||||
result.append(dict(info))
|
||||
return result
|
||||
|
||||
def get_movies(
|
||||
self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None
|
||||
) -> Optional[List[schemas.MediaServerItem]]:
|
||||
if not self.is_authenticated() or not self._api or not title:
|
||||
return None
|
||||
|
||||
data = self._api.search(title)
|
||||
if not data:
|
||||
return []
|
||||
|
||||
movies = []
|
||||
for info in self.__extract_video_info_list(data.get("movies_list")):
|
||||
info_tmdb = info.get("tmdb_id")
|
||||
if tmdb_id and tmdb_id != info_tmdb:
|
||||
continue
|
||||
if title not in [info.get("name"), info.get("original_name")]:
|
||||
continue
|
||||
item_year = info.get("year")
|
||||
if year and str(item_year) != str(year):
|
||||
continue
|
||||
media_item = self.__build_media_server_item(info)
|
||||
if media_item:
|
||||
movies.append(media_item)
|
||||
return movies
|
||||
|
||||
def __search_tv_item(self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None) -> Optional[dict]:
|
||||
if not self._api:
|
||||
return None
|
||||
data = self._api.search(title)
|
||||
if not data:
|
||||
return None
|
||||
|
||||
for info in self.__extract_video_info_list(data.get("tv_list")):
|
||||
if tmdb_id and tmdb_id != info.get("tmdb_id"):
|
||||
continue
|
||||
if title not in [info.get("name"), info.get("original_name")]:
|
||||
continue
|
||||
item_year = info.get("year")
|
||||
if year and str(item_year) != str(year):
|
||||
continue
|
||||
return info
|
||||
return None
|
||||
|
||||
def get_tv_episodes(
|
||||
self,
|
||||
item_id: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
year: Optional[str] = None,
|
||||
tmdb_id: Optional[int] = None,
|
||||
season: Optional[int] = None,
|
||||
) -> tuple[Optional[str], Optional[Dict[int, list]]]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None, None
|
||||
|
||||
if not item_id:
|
||||
if not title:
|
||||
return None, None
|
||||
if not (tv_info := self.__search_tv_item(title, year, tmdb_id)):
|
||||
return None, None
|
||||
found_item_id = tv_info.get("ug_video_info_id")
|
||||
if found_item_id is None:
|
||||
return None, None
|
||||
item_id = str(found_item_id)
|
||||
else:
|
||||
item_id = str(item_id)
|
||||
|
||||
item_info = self.get_iteminfo(item_id)
|
||||
if not item_info:
|
||||
return None, {}
|
||||
if tmdb_id and item_info.tmdbid and tmdb_id != item_info.tmdbid:
|
||||
return None, {}
|
||||
|
||||
tv_detail = self._api.get_tv(item_id, folder_path="ALL")
|
||||
if not tv_detail:
|
||||
return None, {}
|
||||
|
||||
season_map = {}
|
||||
for info in tv_detail.get("season_info") or []:
|
||||
if not isinstance(info, dict):
|
||||
continue
|
||||
category_id = info.get("category_id")
|
||||
season_num = info.get("season_num")
|
||||
if category_id and isinstance(season_num, int):
|
||||
season_map[str(category_id)] = season_num
|
||||
|
||||
season_episodes: Dict[int, list] = {}
|
||||
for ep in tv_detail.get("tv_info") or []:
|
||||
if not isinstance(ep, dict):
|
||||
continue
|
||||
episode = ep.get("episode")
|
||||
if not isinstance(episode, int):
|
||||
continue
|
||||
season_num = season_map.get(str(ep.get("category_id")), 1)
|
||||
if season is not None and season_num != season:
|
||||
continue
|
||||
season_episodes.setdefault(season_num, []).append(episode)
|
||||
|
||||
for season_num in list(season_episodes.keys()):
|
||||
season_episodes[season_num] = sorted(set(season_episodes[season_num]))
|
||||
|
||||
return item_id, season_episodes
|
||||
|
||||
def refresh_root_library(self, scan_mode: Optional[Union[str, int]] = None) -> Optional[bool]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
if not self._libraries:
|
||||
self.get_librarys()
|
||||
|
||||
scan_type = (
|
||||
self.__resolve_scan_type(scan_mode=scan_mode)
|
||||
if scan_mode is not None
|
||||
else self._scan_type
|
||||
)
|
||||
results = []
|
||||
for lib_id in self._libraries.keys():
|
||||
logger.info(
|
||||
f"刷新媒体库:{self._libraries[lib_id].get('name')}(扫描模式: {scan_type})"
|
||||
)
|
||||
results.append(self.__scan_library(library_id=lib_id, scan_type=scan_type))
|
||||
|
||||
return all(results) if results else True
|
||||
|
||||
def __match_library_id_by_path(self, path: Optional[Path]) -> Optional[str]:
|
||||
if path is None:
|
||||
return None
|
||||
|
||||
path_str = self.__normalize_dir_path(path)
|
||||
if not self._library_paths:
|
||||
self.get_librarys()
|
||||
|
||||
for lib_id, lib_path in self._library_paths.items():
|
||||
if self.__is_subpath(path_str, lib_path):
|
||||
return lib_id
|
||||
return None
|
||||
|
||||
def refresh_library_by_items(
|
||||
self,
|
||||
items: List[schemas.RefreshMediaItem],
|
||||
scan_mode: Optional[Union[str, int]] = None,
|
||||
) -> Optional[bool]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
scan_type = (
|
||||
self.__resolve_scan_type(scan_mode=scan_mode)
|
||||
if scan_mode is not None
|
||||
else self._scan_type
|
||||
)
|
||||
library_ids = set()
|
||||
for item in items:
|
||||
library_id = self.__match_library_id_by_path(item.target_path)
|
||||
if library_id is None:
|
||||
return self.refresh_root_library(scan_mode=scan_mode)
|
||||
library_ids.add(library_id)
|
||||
|
||||
for library_id in library_ids:
|
||||
lib_name = self._libraries.get(library_id, {}).get("name", library_id)
|
||||
logger.info(f"刷新媒体库:{lib_name}(扫描模式: {scan_type})")
|
||||
if not self.__scan_library(library_id=library_id, scan_type=scan_type):
|
||||
return self.refresh_root_library(scan_mode=scan_mode)
|
||||
|
||||
return True
|
||||
|
||||
def get_webhook_message(self, body: Any) -> Optional[schemas.WebhookEventInfo]:
|
||||
return None
|
||||
|
||||
def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
|
||||
if not self.is_authenticated() or not self._api or not itemid:
|
||||
return None
|
||||
|
||||
info = self._api.recently_played_info(itemid)
|
||||
if not info:
|
||||
return None
|
||||
|
||||
video_info = info.get("video_info") if isinstance(info.get("video_info"), dict) else None
|
||||
if not video_info or not video_info.get("ug_video_info_id"):
|
||||
return None
|
||||
|
||||
return self.__build_media_server_item(video_info, info.get("play_status"))
|
||||
|
||||
def _iter_library_videos(self, root_path: str, page_size: int = 100):
|
||||
if not self._api or not root_path:
|
||||
return
|
||||
|
||||
queue = deque([root_path])
|
||||
visited: set[str] = set()
|
||||
max_paths = 20000
|
||||
|
||||
while queue and len(visited) < max_paths:
|
||||
current_path = queue.popleft()
|
||||
if current_path in visited:
|
||||
continue
|
||||
visited.add(current_path)
|
||||
|
||||
page = 1
|
||||
while True:
|
||||
data = self._api.poster_wall_get_folder(
|
||||
path=current_path,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
sort_type=1,
|
||||
order_type=1,
|
||||
)
|
||||
if not data:
|
||||
break
|
||||
|
||||
for video in data.get("video_arr") or []:
|
||||
if isinstance(video, dict):
|
||||
yield video
|
||||
|
||||
for folder in data.get("folder_arr") or []:
|
||||
if not isinstance(folder, dict):
|
||||
continue
|
||||
sub_path = folder.get("path")
|
||||
if sub_path and sub_path not in visited:
|
||||
queue.append(str(sub_path))
|
||||
|
||||
if data.get("is_last_page") is True:
|
||||
break
|
||||
page += 1
|
||||
|
||||
def get_items(
|
||||
self,
|
||||
parent: Union[str, int],
|
||||
start_index: Optional[int] = 0,
|
||||
limit: Optional[int] = -1,
|
||||
) -> Generator[schemas.MediaServerItem | None | Any, Any, None]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
library_id = str(parent)
|
||||
if not self._library_paths:
|
||||
self.get_librarys()
|
||||
|
||||
root_path = self._library_paths.get(library_id)
|
||||
if not root_path:
|
||||
return None
|
||||
|
||||
skip = max(0, start_index or 0)
|
||||
remain = -1 if limit in [None, -1] else max(0, limit)
|
||||
|
||||
for video in self._iter_library_videos(root_path=root_path):
|
||||
video_type = video.get("type")
|
||||
if video_type not in [1, 2]:
|
||||
continue
|
||||
|
||||
if skip > 0:
|
||||
skip -= 1
|
||||
continue
|
||||
|
||||
item = self.__build_media_server_item(video)
|
||||
if item:
|
||||
yield item
|
||||
if remain != -1:
|
||||
remain -= 1
|
||||
if remain <= 0:
|
||||
break
|
||||
|
||||
return None
|
||||
|
||||
def get_play_url(self, item_id: str) -> Optional[str]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
info = self._api.recently_played_info(item_id)
|
||||
if not info:
|
||||
return None
|
||||
|
||||
video_info = info.get("video_info") if isinstance(info.get("video_info"), dict) else None
|
||||
if not video_info:
|
||||
return None
|
||||
|
||||
return self.__build_play_url(
|
||||
item_id=item_id,
|
||||
video_type=video_info.get("type"),
|
||||
media_lib_set_id=video_info.get("media_lib_set_id"),
|
||||
)
|
||||
|
||||
def get_resume(self, num: Optional[int] = 12) -> Optional[List[schemas.MediaServerPlayItem]]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
page_size = max(1, num or 12)
|
||||
data = self._api.recently_played(page=1, page_size=page_size)
|
||||
if not data:
|
||||
return []
|
||||
|
||||
ret_resume = []
|
||||
for item in data.get("video_arr") or []:
|
||||
if len(ret_resume) == page_size:
|
||||
break
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {}
|
||||
library_id = str(video_info.get("media_lib_set_id") or "")
|
||||
if self.__is_library_blocked(library_id):
|
||||
continue
|
||||
play_item = self.__build_play_item_from_wrapper(item)
|
||||
if play_item:
|
||||
ret_resume.append(play_item)
|
||||
|
||||
return ret_resume
|
||||
|
||||
def get_latest(self, num: int = 20) -> Optional[List[schemas.MediaServerPlayItem]]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
page_size = max(1, num)
|
||||
data = self._api.recently_updated(page=1, page_size=page_size)
|
||||
if not data:
|
||||
return []
|
||||
|
||||
latest = []
|
||||
for item in data.get("video_arr") or []:
|
||||
if len(latest) == page_size:
|
||||
break
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {}
|
||||
library_id = str(video_info.get("media_lib_set_id") or "")
|
||||
if self.__is_library_blocked(library_id):
|
||||
continue
|
||||
play_item = self.__build_play_item_from_wrapper(item)
|
||||
if play_item:
|
||||
latest.append(play_item)
|
||||
|
||||
return latest
|
||||
|
||||
def get_latest_backdrops(self, num: int = 20, remote: bool = False) -> Optional[List[str]]:
|
||||
if not self.is_authenticated() or not self._api:
|
||||
return None
|
||||
|
||||
data = self._api.recently_updated(page=1, page_size=max(1, num))
|
||||
if not data:
|
||||
return []
|
||||
|
||||
images: List[str] = []
|
||||
for item in data.get("video_arr") or []:
|
||||
if len(images) == num:
|
||||
break
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
||||
video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {}
|
||||
library_id = str(video_info.get("media_lib_set_id") or "")
|
||||
if self.__is_library_blocked(library_id):
|
||||
continue
|
||||
|
||||
image = self.__resolve_image(video_info.get("backdrop_path")) or self.__resolve_image(
|
||||
video_info.get("poster_path")
|
||||
)
|
||||
if image:
|
||||
images.append(image)
|
||||
|
||||
return images
|
||||
|
||||
def get_image_cookies(self, image_url: str):
|
||||
# 绿联图片流接口依赖加密鉴权头,当前图片代理仅支持Cookie注入。
|
||||
return None
|
||||
@@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel):
|
||||
type: Optional[MediaType] = None
|
||||
# 季
|
||||
seasons: Optional[Dict[int, list]] = Field(default_factory=dict)
|
||||
# 媒体服务器类型:plex、jellyfin、emby、trimemedia
|
||||
# 媒体服务器类型:plex、jellyfin、emby、trimemedia、ugreen
|
||||
server_type: Optional[str] = None
|
||||
# 媒体服务器名称
|
||||
server: Optional[str] = None
|
||||
|
||||
@@ -29,7 +29,7 @@ class MediaServerConf(BaseModel):
|
||||
|
||||
# 名称
|
||||
name: Optional[str] = None
|
||||
# 类型 emby/jellyfin/plex
|
||||
# 类型 emby/jellyfin/plex/trimemedia/ugreen
|
||||
type: Optional[str] = None
|
||||
# 配置
|
||||
config: Optional[dict] = Field(default_factory=dict)
|
||||
|
||||
@@ -219,6 +219,8 @@ class SystemConfigKey(Enum):
|
||||
PluginInstallReport = "PluginInstallReport"
|
||||
# 配置向导状态
|
||||
SetupWizardState = "SetupWizardState"
|
||||
# 绿联影视登录会话缓存
|
||||
UgreenSessionCache = "UgreenSessionCache"
|
||||
|
||||
|
||||
# 处理进度Key字典
|
||||
@@ -309,6 +311,8 @@ class MediaServerType(Enum):
|
||||
Plex = "Plex"
|
||||
# 飞牛影视
|
||||
TrimeMedia = "TrimeMedia"
|
||||
# 绿联影视
|
||||
Ugreen = "Ugreen"
|
||||
|
||||
|
||||
# 识别器类型
|
||||
|
||||
242
app/utils/ugreen_crypto.py
Normal file
242
app/utils/ugreen_crypto.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Mapping, Sequence
|
||||
from urllib.parse import quote, urlencode, urlsplit, urlunsplit
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
|
||||
|
||||
@dataclass
|
||||
class UgreenEncryptedRequest:
|
||||
url: str
|
||||
headers: dict[str, str]
|
||||
params: dict[str, str]
|
||||
json: dict[str, Any] | None
|
||||
aes_key: str
|
||||
plain_query: str
|
||||
|
||||
|
||||
class UgreenCrypto:
|
||||
"""
|
||||
绿联接口请求加解密工具。
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
public_key: str,
|
||||
token: str | None = None,
|
||||
client_id: str | None = None,
|
||||
client_version: str | None = "76363",
|
||||
ug_agent: str | None = "PC/WEB",
|
||||
language: str = "zh-CN",
|
||||
) -> None:
|
||||
self.public_key_pem = self.normalize_public_key(public_key)
|
||||
self.public_key = serialization.load_pem_public_key(
|
||||
self.public_key_pem.encode("utf-8")
|
||||
)
|
||||
self.token = token
|
||||
self.client_id = client_id
|
||||
self.client_version = client_version
|
||||
self.ug_agent = ug_agent
|
||||
self.language = language
|
||||
|
||||
@staticmethod
|
||||
def normalize_public_key(public_key: str) -> str:
|
||||
key = (public_key or "").strip().strip('"').replace("\\n", "\n")
|
||||
if "BEGIN" in key:
|
||||
return key if key.endswith("\n") else f"{key}\n"
|
||||
return (
|
||||
"-----BEGIN RSA PUBLIC KEY-----\n"
|
||||
f"{key}\n"
|
||||
"-----END RSA PUBLIC KEY-----\n"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def generate_aes_key() -> str:
|
||||
return uuid.uuid4().hex
|
||||
|
||||
@staticmethod
|
||||
def _flatten_query(prefix: str, value: Any) -> list[tuple[str, str]]:
|
||||
pairs: list[tuple[str, str]] = []
|
||||
if isinstance(value, Mapping):
|
||||
for key, item in value.items():
|
||||
next_prefix = f"{prefix}[{key}]" if prefix else str(key)
|
||||
pairs.extend(UgreenCrypto._flatten_query(next_prefix, item))
|
||||
return pairs
|
||||
if isinstance(value, Sequence) and not isinstance(
|
||||
value, (str, bytes, bytearray)
|
||||
):
|
||||
for item in value:
|
||||
next_prefix = f"{prefix}[]"
|
||||
pairs.extend(UgreenCrypto._flatten_query(next_prefix, item))
|
||||
return pairs
|
||||
if isinstance(value, bool):
|
||||
pairs.append((prefix, "true" if value else "false"))
|
||||
return pairs
|
||||
if value is None:
|
||||
pairs.append((prefix, ""))
|
||||
return pairs
|
||||
pairs.append((prefix, str(value)))
|
||||
return pairs
|
||||
|
||||
@classmethod
|
||||
def encode_query(cls, params: Mapping[str, Any] | None) -> str:
|
||||
if not params:
|
||||
return ""
|
||||
pairs: list[tuple[str, str]] = []
|
||||
for key, value in params.items():
|
||||
pairs.extend(cls._flatten_query(str(key), value))
|
||||
return urlencode(pairs, doseq=False, quote_via=quote, safe="")
|
||||
|
||||
def rsa_encrypt_long(self, plaintext: str) -> str:
|
||||
if not plaintext:
|
||||
return ""
|
||||
key_size = self.public_key.key_size // 8
|
||||
max_chunk = key_size - 11
|
||||
encrypted_chunks: list[bytes] = []
|
||||
raw = plaintext.encode("utf-8")
|
||||
for start in range(0, len(raw), max_chunk):
|
||||
chunk = raw[start : start + max_chunk]
|
||||
encrypted_chunks.append(
|
||||
self.public_key.encrypt(chunk, padding.PKCS1v15())
|
||||
)
|
||||
return base64.b64encode(b"".join(encrypted_chunks)).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def aes_gcm_encrypt(plaintext: str, aes_key: str) -> str:
|
||||
iv = os.urandom(12)
|
||||
cipher = AESGCM(aes_key.encode("utf-8"))
|
||||
encrypted = cipher.encrypt(iv, plaintext.encode("utf-8"), None)
|
||||
# encrypt 返回 ciphertext + tag
|
||||
return base64.b64encode(iv + encrypted).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def aes_gcm_decrypt(payload_b64: str, aes_key: str) -> str:
|
||||
raw = base64.b64decode(payload_b64)
|
||||
iv = raw[:12]
|
||||
encrypted = raw[12:]
|
||||
cipher = AESGCM(aes_key.encode("utf-8"))
|
||||
plain = cipher.decrypt(iv, encrypted, None)
|
||||
return plain.decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def build_security_key(token: str) -> str:
|
||||
return hashlib.md5(token.encode("utf-8")).hexdigest()
|
||||
|
||||
@staticmethod
|
||||
def _normalize_body(data: Any) -> str:
|
||||
if isinstance(data, str):
|
||||
return data
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
return bytes(data).decode("utf-8")
|
||||
return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
def encrypt_body(self, data: Any, aes_key: str) -> dict[str, str]:
|
||||
plain = self._normalize_body(data)
|
||||
return {
|
||||
"encrypt_req_body": self.aes_gcm_encrypt(plain, aes_key),
|
||||
"req_body_sha256": hashlib.sha256(plain.encode("utf-8")).hexdigest(),
|
||||
}
|
||||
|
||||
def build_headers(
|
||||
self,
|
||||
aes_key: str,
|
||||
token: str | None = None,
|
||||
extra_headers: Mapping[str, str] | None = None,
|
||||
encrypt_token: bool = True,
|
||||
) -> dict[str, str]:
|
||||
token_value = token if token is not None else self.token
|
||||
headers: dict[str, str] = dict(extra_headers or {})
|
||||
|
||||
if self.client_id:
|
||||
headers.setdefault("Client-Id", self.client_id)
|
||||
if self.client_version:
|
||||
headers.setdefault("Client-Version", self.client_version)
|
||||
if self.ug_agent:
|
||||
headers.setdefault("UG-Agent", self.ug_agent)
|
||||
headers.setdefault("X-Specify-Language", self.language)
|
||||
headers.setdefault("Accept", "application/json, text/plain, */*")
|
||||
|
||||
if token_value:
|
||||
headers["X-Ugreen-Security-Key"] = self.build_security_key(token_value)
|
||||
headers["X-Ugreen-Security-Code"] = self.rsa_encrypt_long(aes_key)
|
||||
headers["X-Ugreen-Token"] = (
|
||||
self.rsa_encrypt_long(token_value) if encrypt_token else token_value
|
||||
)
|
||||
return headers
|
||||
|
||||
def build_encrypted_request(
|
||||
self,
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
params: Mapping[str, Any] | None = None,
|
||||
data: Any | None = None,
|
||||
extra_headers: Mapping[str, str] | None = None,
|
||||
token: str | None = None,
|
||||
encrypt_token: bool = True,
|
||||
encrypt_body: bool = True,
|
||||
) -> UgreenEncryptedRequest:
|
||||
"""
|
||||
构建绿联加密请求。
|
||||
|
||||
关键点:
|
||||
- 传入的是明文 `params`;
|
||||
- 方法内部会将其序列化并加密成 `encrypt_query`;
|
||||
- 业务侧不需要、也不应该手工拼接 `encrypt_query`。
|
||||
"""
|
||||
parsed = urlsplit(url)
|
||||
clean_url = urlunsplit(
|
||||
(parsed.scheme, parsed.netloc, parsed.path, "", parsed.fragment)
|
||||
)
|
||||
|
||||
url_query_plain = parsed.query
|
||||
input_query_plain = self.encode_query(params)
|
||||
plain_query = "&".join(filter(None, [url_query_plain, input_query_plain]))
|
||||
|
||||
aes_key = self.generate_aes_key()
|
||||
encrypted_query = self.aes_gcm_encrypt(plain_query, aes_key)
|
||||
|
||||
req_json = None
|
||||
if data is not None:
|
||||
req_json = self.encrypt_body(data, aes_key) if encrypt_body else data
|
||||
|
||||
headers = self.build_headers(
|
||||
aes_key=aes_key,
|
||||
token=token,
|
||||
extra_headers=extra_headers,
|
||||
encrypt_token=encrypt_token,
|
||||
)
|
||||
if req_json is not None:
|
||||
headers.setdefault("Content-Type", "application/json")
|
||||
|
||||
_ = method # 保留参数,便于上层统一调用
|
||||
|
||||
return UgreenEncryptedRequest(
|
||||
url=clean_url,
|
||||
headers=headers,
|
||||
# 绿联接口约定:查询参数统一透传为 encrypt_query
|
||||
params={"encrypt_query": encrypted_query},
|
||||
json=req_json,
|
||||
aes_key=aes_key,
|
||||
plain_query=plain_query,
|
||||
)
|
||||
|
||||
def decrypt_response(self, response_json: Any, aes_key: str) -> Any:
|
||||
if not isinstance(response_json, Mapping):
|
||||
return response_json
|
||||
encrypted = response_json.get("encrypt_resp_body")
|
||||
if not encrypted:
|
||||
return response_json
|
||||
plain = self.aes_gcm_decrypt(str(encrypted), aes_key)
|
||||
try:
|
||||
return json.loads(plain)
|
||||
except json.JSONDecodeError:
|
||||
return plain
|
||||
299
tests/manual/ugreen_media_cli.py
Normal file
299
tests/manual/ugreen_media_cli.py
Normal file
@@ -0,0 +1,299 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import getpass
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
from typing import Any, Mapping
|
||||
from urllib.parse import urlsplit, urlunsplit
|
||||
|
||||
# 兼容直接运行脚本:避免 app/utils 被放在 sys.path 首位导致标准库模块被同名文件遮蔽
|
||||
if __name__ == "__main__" and __package__ is None:
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
project_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
|
||||
if script_dir in sys.path:
|
||||
sys.path.remove(script_dir)
|
||||
if project_root not in sys.path:
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
import requests
|
||||
|
||||
from app.utils.ugreen_crypto import UgreenCrypto
|
||||
|
||||
|
||||
class UgreenLoginError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _normalize_base_url(raw: str) -> str:
|
||||
value = (raw or "").strip()
|
||||
if not value:
|
||||
raise UgreenLoginError("服务器地址不能为空")
|
||||
if not value.startswith(("http://", "https://")):
|
||||
value = f"http://{value}"
|
||||
parsed = urlsplit(value)
|
||||
if not parsed.netloc:
|
||||
raise UgreenLoginError(f"无效服务器地址: {raw}")
|
||||
return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/")
|
||||
|
||||
|
||||
def _json_or_raise(resp: requests.Response, stage: str) -> dict[str, Any]:
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception as exc: # pragma: no cover - 网络异常路径
|
||||
raise UgreenLoginError(
|
||||
f"{stage} 返回非 JSON,HTTP {resp.status_code},响应片段: {resp.text[:200]}"
|
||||
) from exc
|
||||
if not isinstance(data, dict):
|
||||
raise UgreenLoginError(f"{stage} 返回格式异常: {type(data).__name__}")
|
||||
return data
|
||||
|
||||
|
||||
def _decode_public_key(raw: str) -> str:
|
||||
value = (raw or "").strip()
|
||||
if not value:
|
||||
raise UgreenLoginError("未获取到公钥")
|
||||
if "BEGIN" in value:
|
||||
return value
|
||||
try:
|
||||
return base64.b64decode(value).decode("utf-8")
|
||||
except Exception as exc:
|
||||
raise UgreenLoginError("公钥解码失败") from exc
|
||||
|
||||
|
||||
def _raise_if_failed(payload: Mapping[str, Any], stage: str) -> None:
|
||||
if payload.get("code") == 200:
|
||||
return
|
||||
raise UgreenLoginError(
|
||||
f"{stage}失败: code={payload.get('code')} msg={payload.get('msg')}"
|
||||
)
|
||||
|
||||
|
||||
def _build_common_headers(
|
||||
client_id: str, client_version: str, language: str
|
||||
) -> dict[str, str]:
|
||||
return {
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"Client-Id": client_id,
|
||||
"Client-Version": client_version,
|
||||
"UG-Agent": "PC/WEB",
|
||||
"X-Specify-Language": language,
|
||||
}
|
||||
|
||||
|
||||
def _login_and_get_access(
|
||||
session: requests.Session,
|
||||
base_url: str,
|
||||
username: str,
|
||||
password: str,
|
||||
keepalive: bool,
|
||||
headers: Mapping[str, str],
|
||||
timeout: float,
|
||||
verify_ssl: bool,
|
||||
) -> tuple[str, str]:
|
||||
check_resp = session.post(
|
||||
f"{base_url}/ugreen/v1/verify/check",
|
||||
json={"username": username},
|
||||
headers=dict(headers),
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
check_json = _json_or_raise(check_resp, "获取登录公钥")
|
||||
_raise_if_failed(check_json, "获取登录公钥")
|
||||
|
||||
rsa_token = (
|
||||
check_resp.headers.get("x-rsa-token")
|
||||
or check_resp.headers.get("X-Rsa-Token")
|
||||
or check_json.get("xRsaToken")
|
||||
or check_json.get("x-rsa-token")
|
||||
)
|
||||
if not rsa_token:
|
||||
data = check_json.get("data")
|
||||
if isinstance(data, Mapping):
|
||||
rsa_token = data.get("xRsaToken") or data.get("x-rsa-token")
|
||||
if not rsa_token:
|
||||
raise UgreenLoginError("登录公钥为空(x-rsa-token)")
|
||||
|
||||
login_public_key = _decode_public_key(str(rsa_token))
|
||||
encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(
|
||||
password
|
||||
)
|
||||
|
||||
login_payload = {
|
||||
"username": username,
|
||||
"password": encrypted_password,
|
||||
"keepalive": keepalive,
|
||||
"otp": True,
|
||||
"is_simple": True,
|
||||
}
|
||||
login_resp = session.post(
|
||||
f"{base_url}/ugreen/v1/verify/login",
|
||||
json=login_payload,
|
||||
headers=dict(headers),
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
login_json = _json_or_raise(login_resp, "登录")
|
||||
_raise_if_failed(login_json, "登录")
|
||||
|
||||
data = login_json.get("data")
|
||||
if not isinstance(data, Mapping):
|
||||
raise UgreenLoginError("登录成功但响应 data 为空")
|
||||
|
||||
token = str(data.get("token") or "").strip()
|
||||
public_key = str(data.get("public_key") or "").strip()
|
||||
if not token:
|
||||
raise UgreenLoginError("登录成功但未拿到 token")
|
||||
if not public_key:
|
||||
raise UgreenLoginError("登录成功但未拿到 public_key")
|
||||
return token, _decode_public_key(public_key)
|
||||
|
||||
|
||||
def _fetch_media_lib(
|
||||
session: requests.Session,
|
||||
base_url: str,
|
||||
token: str,
|
||||
public_key: str,
|
||||
client_id: str,
|
||||
client_version: str,
|
||||
language: str,
|
||||
page: int,
|
||||
page_size: int,
|
||||
timeout: float,
|
||||
verify_ssl: bool,
|
||||
) -> Any:
|
||||
crypto = UgreenCrypto(
|
||||
public_key=public_key,
|
||||
token=token,
|
||||
client_id=client_id,
|
||||
client_version=client_version,
|
||||
ug_agent="PC/WEB",
|
||||
language=language,
|
||||
)
|
||||
req = crypto.build_encrypted_request(
|
||||
url=f"{base_url}/ugreen/v1/video/homepage/media_list",
|
||||
method="GET",
|
||||
params={"page": page, "page_size": page_size},
|
||||
)
|
||||
media_resp = session.get(
|
||||
req.url,
|
||||
headers=req.headers,
|
||||
params=req.params,
|
||||
timeout=timeout,
|
||||
verify=verify_ssl,
|
||||
)
|
||||
media_json = _json_or_raise(media_resp, "获取媒体库")
|
||||
return crypto.decrypt_response(media_json, req.aes_key)
|
||||
|
||||
|
||||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="登录绿联 NAS 并调用媒体库接口(自动处理请求加密/响应解密)"
|
||||
)
|
||||
parser.add_argument("--host", help="服务器地址,例如: http://192.168.20.101:9999")
|
||||
parser.add_argument("--username", help="用户名")
|
||||
parser.add_argument("--password", help="密码(不传则交互输入)")
|
||||
parser.add_argument("--client-id", help="可选,默认自动生成 UUID-WEB")
|
||||
parser.add_argument("--client-version", default="76363", help="默认: 76363")
|
||||
parser.add_argument("--language", default="zh-CN", help="默认: zh-CN")
|
||||
parser.add_argument("--page", type=int, default=1, help="默认: 1")
|
||||
parser.add_argument("--page-size", type=int, default=50, help="默认: 50")
|
||||
parser.add_argument("--timeout", type=float, default=20.0, help="默认: 20 秒")
|
||||
parser.add_argument("--insecure", action="store_true", help="忽略 HTTPS 证书校验")
|
||||
parser.add_argument(
|
||||
"--no-keepalive",
|
||||
action="store_true",
|
||||
help="关闭保持登录(默认保持登录)",
|
||||
)
|
||||
parser.add_argument("--pretty", action="store_true", help="美化输出 JSON")
|
||||
parser.add_argument("--output", help="将解密后的结果写入文件")
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv or sys.argv[1:])
|
||||
|
||||
host = args.host or input("服务器地址: ").strip()
|
||||
username = args.username or input("用户名: ").strip()
|
||||
password = args.password or getpass.getpass("密码: ")
|
||||
client_id = (args.client_id or f"{uuid.uuid4()}-WEB").strip()
|
||||
keepalive = not args.no_keepalive
|
||||
verify_ssl = not args.insecure
|
||||
|
||||
try:
|
||||
base_url = _normalize_base_url(host)
|
||||
if args.insecure:
|
||||
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
|
||||
|
||||
session = requests.Session()
|
||||
headers = _build_common_headers(
|
||||
client_id=client_id,
|
||||
client_version=args.client_version,
|
||||
language=args.language,
|
||||
)
|
||||
|
||||
token, public_key = _login_and_get_access(
|
||||
session=session,
|
||||
base_url=base_url,
|
||||
username=username,
|
||||
password=password,
|
||||
keepalive=keepalive,
|
||||
headers=headers,
|
||||
timeout=args.timeout,
|
||||
verify_ssl=verify_ssl,
|
||||
)
|
||||
decoded = _fetch_media_lib(
|
||||
session=session,
|
||||
base_url=base_url,
|
||||
token=token,
|
||||
public_key=public_key,
|
||||
client_id=client_id,
|
||||
client_version=args.client_version,
|
||||
language=args.language,
|
||||
page=args.page,
|
||||
page_size=args.page_size,
|
||||
timeout=args.timeout,
|
||||
verify_ssl=verify_ssl,
|
||||
)
|
||||
|
||||
if isinstance(decoded, Mapping):
|
||||
if decoded.get("code") != 200:
|
||||
raise UgreenLoginError(
|
||||
f"媒体库接口失败: code={decoded.get('code')} msg={decoded.get('msg')}"
|
||||
)
|
||||
media_count = None
|
||||
data = decoded.get("data")
|
||||
if isinstance(data, Mapping) and isinstance(data.get("media_lib_info_list"), list):
|
||||
media_count = len(data["media_lib_info_list"])
|
||||
print(
|
||||
f"调用成功: code={decoded.get('code')} msg={decoded.get('msg')} "
|
||||
f"media_lib_info_list={media_count}"
|
||||
)
|
||||
|
||||
text = json.dumps(
|
||||
decoded,
|
||||
ensure_ascii=False,
|
||||
indent=2 if args.pretty else None,
|
||||
separators=(",", ":") if not args.pretty else None,
|
||||
)
|
||||
if args.output:
|
||||
with open(args.output, "w", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
f.write("\n")
|
||||
print(f"解密结果已写入: {args.output}")
|
||||
else:
|
||||
print(text)
|
||||
return 0
|
||||
except UgreenLoginError as exc:
|
||||
print(f"错误: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
except requests.RequestException as exc:
|
||||
print(f"网络错误: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
95
tests/test_ugreen_crypto.py
Normal file
95
tests/test_ugreen_crypto.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import unittest
|
||||
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
||||
|
||||
from app.utils.ugreen_crypto import UgreenCrypto
|
||||
|
||||
|
||||
def _generate_rsa_keys() -> tuple[str, rsa.RSAPrivateKey]:
|
||||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
public_pem = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.PKCS1,
|
||||
).decode("utf-8")
|
||||
return public_pem, private_key
|
||||
|
||||
|
||||
def _rsa_decrypt_long(private_key: rsa.RSAPrivateKey, payload_b64: str) -> str:
|
||||
encrypted = base64.b64decode(payload_b64)
|
||||
chunk_size = private_key.key_size // 8
|
||||
plain_chunks = []
|
||||
for start in range(0, len(encrypted), chunk_size):
|
||||
chunk = encrypted[start : start + chunk_size]
|
||||
plain_chunks.append(private_key.decrypt(chunk, padding.PKCS1v15()))
|
||||
return b"".join(plain_chunks).decode("utf-8")
|
||||
|
||||
|
||||
class UgreenCryptoTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.public_key, self.private_key = _generate_rsa_keys()
|
||||
self.token = "demo-token-for-test"
|
||||
self.crypto = UgreenCrypto(
|
||||
public_key=self.public_key,
|
||||
token=self.token,
|
||||
client_id="test-client-id",
|
||||
)
|
||||
|
||||
def test_rsa_encrypt_long(self):
|
||||
plain = "A" * 400
|
||||
encrypted = self.crypto.rsa_encrypt_long(plain)
|
||||
self.assertEqual(plain, _rsa_decrypt_long(self.private_key, encrypted))
|
||||
|
||||
def test_build_encrypted_request_and_decrypt_response(self):
|
||||
req = self.crypto.build_encrypted_request(
|
||||
url="http://127.0.0.1:9999/ugreen/v1/video/homepage/media_list",
|
||||
params={"page": 1, "page_size": 50},
|
||||
data={"foo": "bar", "count": 2},
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
req.plain_query,
|
||||
"page=1&page_size=50",
|
||||
)
|
||||
self.assertEqual(
|
||||
req.plain_query,
|
||||
self.crypto.aes_gcm_decrypt(req.params["encrypt_query"], req.aes_key),
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
req.headers["X-Ugreen-Security-Key"],
|
||||
hashlib.md5(self.token.encode("utf-8")).hexdigest(),
|
||||
)
|
||||
self.assertEqual(
|
||||
req.aes_key,
|
||||
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Security-Code"]),
|
||||
)
|
||||
self.assertEqual(
|
||||
self.token,
|
||||
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Token"]),
|
||||
)
|
||||
|
||||
encrypted_body = req.json["encrypt_req_body"]
|
||||
body_plain = self.crypto.aes_gcm_decrypt(encrypted_body, req.aes_key)
|
||||
self.assertEqual(json.loads(body_plain), {"foo": "bar", "count": 2})
|
||||
self.assertEqual(
|
||||
req.json["req_body_sha256"],
|
||||
hashlib.sha256(body_plain.encode("utf-8")).hexdigest(),
|
||||
)
|
||||
|
||||
server_payload = {"code": 0, "msg": "ok", "data": {"items": [1, 2, 3]}}
|
||||
resp = {
|
||||
"encrypt_resp_body": self.crypto.aes_gcm_encrypt(
|
||||
json.dumps(server_payload, ensure_ascii=False, separators=(",", ":")),
|
||||
req.aes_key,
|
||||
)
|
||||
}
|
||||
decoded = self.crypto.decrypt_response(resp, req.aes_key)
|
||||
self.assertEqual(decoded, server_payload)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
176
tests/test_ugreen_mediaserver.py
Normal file
176
tests/test_ugreen_mediaserver.py
Normal file
@@ -0,0 +1,176 @@
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
from app import schemas
|
||||
|
||||
try:
|
||||
from app.api.endpoints import dashboard as dashboard_endpoint
|
||||
except Exception:
|
||||
dashboard_endpoint = None
|
||||
|
||||
|
||||
def _load_ugreen_class():
|
||||
"""
|
||||
在测试中动态加载 Ugreen,避免受可选依赖(如 pyquery/sqlalchemy)影响。
|
||||
"""
|
||||
module_name = "_test_ugreen_module"
|
||||
if module_name in sys.modules:
|
||||
return sys.modules[module_name].Ugreen
|
||||
|
||||
# 轻量日志桩
|
||||
if "app.log" not in sys.modules:
|
||||
log_module = types.ModuleType("app.log")
|
||||
|
||||
class _Logger:
|
||||
def info(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def warning(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def error(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def debug(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
log_module.logger = _Logger()
|
||||
sys.modules["app.log"] = log_module
|
||||
|
||||
# SystemConfigOper 桩
|
||||
if "app.db.systemconfig_oper" not in sys.modules:
|
||||
db_module = types.ModuleType("app.db.systemconfig_oper")
|
||||
|
||||
class _SystemConfigOper:
|
||||
@staticmethod
|
||||
def get(_key):
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def set(_key, _value):
|
||||
return None
|
||||
|
||||
db_module.SystemConfigOper = _SystemConfigOper
|
||||
sys.modules["app.db.systemconfig_oper"] = db_module
|
||||
|
||||
# app.modules / app.modules.ugreen / app.modules.ugreen.api 桩
|
||||
if "app.modules" not in sys.modules:
|
||||
pkg = types.ModuleType("app.modules")
|
||||
pkg.__path__ = []
|
||||
sys.modules["app.modules"] = pkg
|
||||
if "app.modules.ugreen" not in sys.modules:
|
||||
subpkg = types.ModuleType("app.modules.ugreen")
|
||||
subpkg.__path__ = []
|
||||
sys.modules["app.modules.ugreen"] = subpkg
|
||||
if "app.modules.ugreen.api" not in sys.modules:
|
||||
api_module = types.ModuleType("app.modules.ugreen.api")
|
||||
|
||||
class _Api:
|
||||
host = ""
|
||||
token = None
|
||||
|
||||
api_module.Api = _Api
|
||||
sys.modules["app.modules.ugreen.api"] = api_module
|
||||
|
||||
ugreen_path = Path(__file__).resolve().parents[1] / "app" / "modules" / "ugreen" / "ugreen.py"
|
||||
spec = importlib.util.spec_from_file_location(module_name, ugreen_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
assert spec and spec.loader
|
||||
spec.loader.exec_module(module)
|
||||
return module.Ugreen
|
||||
|
||||
|
||||
Ugreen = _load_ugreen_class()
|
||||
|
||||
|
||||
class _FakeUgreenApi:
|
||||
host = "http://127.0.0.1:9999"
|
||||
token = "test-token"
|
||||
|
||||
@staticmethod
|
||||
def video_all(classification: int, page: int = 1, page_size: int = 1):
|
||||
if classification == -102:
|
||||
return {"total_num": 12}
|
||||
if classification == -103:
|
||||
return {"total_num": 34}
|
||||
return {"total_num": 0}
|
||||
|
||||
|
||||
class UgreenScanModeTest(unittest.TestCase):
|
||||
def test_resolve_scan_type(self):
|
||||
resolve = Ugreen._Ugreen__resolve_scan_type
|
||||
|
||||
self.assertEqual(resolve(scan_mode="new_and_modified"), 1)
|
||||
self.assertEqual(resolve(scan_mode="supplement_missing"), 2)
|
||||
self.assertEqual(resolve(scan_mode="full_override"), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_mode="1"), 1)
|
||||
self.assertEqual(resolve(scan_mode="2"), 2)
|
||||
self.assertEqual(resolve(scan_mode="3"), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_type=1), 1)
|
||||
self.assertEqual(resolve(scan_type=2), 2)
|
||||
self.assertEqual(resolve(scan_type=3), 3)
|
||||
|
||||
self.assertEqual(resolve(scan_mode="unknown"), 2)
|
||||
self.assertEqual(resolve(), 2)
|
||||
|
||||
|
||||
class UgreenStatisticTest(unittest.TestCase):
|
||||
def test_get_medias_count_episode_is_none(self):
|
||||
ugreen = Ugreen.__new__(Ugreen)
|
||||
ugreen._host = "http://127.0.0.1:9999"
|
||||
ugreen._username = "tester"
|
||||
ugreen._password = "secret"
|
||||
ugreen._userinfo = {"name": "tester"}
|
||||
ugreen._api = _FakeUgreenApi()
|
||||
|
||||
stat = ugreen.get_medias_count()
|
||||
self.assertEqual(stat.movie_count, 12)
|
||||
self.assertEqual(stat.tv_count, 34)
|
||||
self.assertIsNone(stat.episode_count)
|
||||
|
||||
|
||||
class DashboardStatisticTest(unittest.TestCase):
|
||||
@unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing")
|
||||
def test_statistic_all_episode_missing(self):
|
||||
mocked_stats = [
|
||||
schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2),
|
||||
schemas.Statistic(movie_count=1, tv_count=2, episode_count=None, user_count=1),
|
||||
]
|
||||
with patch(
|
||||
"app.api.endpoints.dashboard.DashboardChain.media_statistic",
|
||||
return_value=mocked_stats,
|
||||
):
|
||||
ret = dashboard_endpoint.statistic(name="ugreen", _=None)
|
||||
|
||||
self.assertEqual(ret.movie_count, 11)
|
||||
self.assertEqual(ret.tv_count, 22)
|
||||
self.assertEqual(ret.user_count, 3)
|
||||
self.assertIsNone(ret.episode_count)
|
||||
|
||||
@unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing")
|
||||
def test_statistic_mixed_episode_count(self):
|
||||
mocked_stats = [
|
||||
schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2),
|
||||
schemas.Statistic(movie_count=1, tv_count=2, episode_count=6, user_count=1),
|
||||
]
|
||||
with patch(
|
||||
"app.api.endpoints.dashboard.DashboardChain.media_statistic",
|
||||
return_value=mocked_stats,
|
||||
):
|
||||
ret = dashboard_endpoint.statistic(name="all", _=None)
|
||||
|
||||
self.assertEqual(ret.movie_count, 11)
|
||||
self.assertEqual(ret.tv_count, 22)
|
||||
self.assertEqual(ret.user_count, 3)
|
||||
self.assertEqual(ret.episode_count, 6)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user