diff --git a/app/api/endpoints/dashboard.py b/app/api/endpoints/dashboard.py index 3063d94e..6500952b 100644 --- a/app/api/endpoints/dashboard.py +++ b/app/api/endpoints/dashboard.py @@ -26,11 +26,17 @@ def statistic(name: Optional[str] = None, _: schemas.TokenPayload = Depends(veri if media_statistics: # 汇总各媒体库统计信息 ret_statistic = schemas.Statistic() + has_episode_count = False for media_statistic in media_statistics: - ret_statistic.movie_count += media_statistic.movie_count - ret_statistic.tv_count += media_statistic.tv_count - ret_statistic.episode_count += media_statistic.episode_count - ret_statistic.user_count += media_statistic.user_count + ret_statistic.movie_count += media_statistic.movie_count or 0 + ret_statistic.tv_count += media_statistic.tv_count or 0 + ret_statistic.user_count += media_statistic.user_count or 0 + if media_statistic.episode_count is not None: + ret_statistic.episode_count += media_statistic.episode_count or 0 + has_episode_count = True + if not has_episode_count: + # 所有媒体服务都未提供剧集统计时,返回 None 供前端展示“未获取”。 + ret_statistic.episode_count = None return ret_statistic else: return schemas.Statistic() diff --git a/app/modules/ugreen/__init__.py b/app/modules/ugreen/__init__.py new file mode 100644 index 00000000..59d0c9de --- /dev/null +++ b/app/modules/ugreen/__init__.py @@ -0,0 +1,358 @@ +from typing import Any, Generator, List, Optional, Tuple, Union + +from app import schemas +from app.core.context import MediaInfo +from app.core.event import eventmanager +from app.log import logger +from app.modules import _MediaServerBase, _ModuleBase +from app.modules.ugreen.ugreen import Ugreen +from app.schemas import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType + + +class UgreenModule(_ModuleBase, _MediaServerBase[Ugreen]): + + def init_module(self) -> None: + """ + 初始化模块 + """ + super().init_service( + service_name=Ugreen.__name__.lower(), + service_type=lambda conf: Ugreen( + **conf.config, sync_libraries=conf.sync_libraries + ), + ) + + @staticmethod + def get_name() -> str: + return "绿联影视" + + @staticmethod + def get_type() -> ModuleType: + """ + 获取模块类型 + """ + return ModuleType.MediaServer + + @staticmethod + def get_subtype() -> MediaServerType: + """ + 获取模块子类型 + """ + return MediaServerType.Ugreen + + @staticmethod + def get_priority() -> int: + """ + 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 + """ + return 5 + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def scheduler_job(self) -> None: + """ + 定时任务,每10分钟调用一次 + """ + for name, server in self.get_instances().items(): + if server.is_configured() and server.is_inactive(): + logger.info(f"绿联影视 {name} 连接断开,尝试重连 ...") + server.reconnect() + + def stop(self): + for server in self.get_instances().values(): + if server.is_authenticated(): + server.disconnect() + + def test(self) -> Optional[Tuple[bool, str]]: + """ + 测试模块连接性 + """ + if not self.get_instances(): + return None + for name, server in self.get_instances().items(): + if not server.is_configured(): + return False, f"绿联影视配置不完整:{name}" + if server.is_inactive() and not server.reconnect(): + return False, f"无法连接绿联影视:{name}" + return True, "" + + def user_authenticate( + self, credentials: AuthCredentials, service_name: Optional[str] = None + ) -> Optional[AuthCredentials]: + """ + 使用绿联影视用户辅助完成用户认证 + """ + if not credentials or credentials.grant_type != "password": + return None + + if service_name: + servers = ( + [(service_name, server)] + if (server := self.get_instance(service_name)) + else [] + ) + else: + servers = self.get_instances().items() + + for name, server in servers: + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials( + username=credentials.username, + channel=self.get_name(), + service=name, + status="triggered", + ), + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue + token = server.authenticate(credentials.username, credentials.password) + if token: + credentials.channel = self.get_name() + credentials.service = name + credentials.token = token + return credentials + return None + + def webhook_parser( + self, body: Any, form: Any, args: Any + ) -> Optional[schemas.WebhookEventInfo]: + """ + 解析Webhook报文体 + """ + source = args.get("source") + if source: + server: Optional[Ugreen] = self.get_instance(source) + if not server: + return None + result = server.get_webhook_message(body) + if result: + result.server_name = source + return result + + for server in self.get_instances().values(): + if server: + result = server.get_webhook_message(body) + if result: + return result + return None + + def media_exists( + self, + mediainfo: MediaInfo, + itemid: Optional[str] = None, + server: Optional[str] = None, + ) -> Optional[schemas.ExistMediaInfo]: + """ + 判断媒体文件是否存在 + """ + if server: + servers = [(server, self.get_instance(server))] + else: + servers = self.get_instances().items() + + for name, s in servers: + if not s: + continue + if mediainfo.type == MediaType.MOVIE: + if itemid: + movie = s.get_iteminfo(itemid) + if movie: + logger.info(f"媒体库 {name} 中找到了 {movie}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="ugreen", + server=name, + itemid=movie.item_id, + ) + movies = s.get_movies( + title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id, + ) + if not movies: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + logger.info(f"媒体库 {name} 中找到了 {movies}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="ugreen", + server=name, + itemid=movies[0].item_id, + ) + + itemid, tvs = s.get_tv_episodes( + title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id, + item_id=itemid, + ) + if not tvs: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}") + return schemas.ExistMediaInfo( + type=MediaType.TV, + seasons=tvs, + server_type="ugreen", + server=name, + itemid=itemid, + ) + return None + + def media_statistic( + self, server: Optional[str] = None + ) -> Optional[List[schemas.Statistic]]: + """ + 媒体数量统计 + """ + if server: + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return None + servers = [server_obj] + else: + servers = self.get_instances().values() + + media_statistics = [] + for s in servers: + media_statistic = s.get_medias_count() + if not media_statistic: + continue + media_statistic.user_count = s.get_user_count() + media_statistics.append(media_statistic) + return media_statistics + + def mediaserver_librarys( + self, server: Optional[str] = None, hidden: Optional[bool] = False, **kwargs + ) -> Optional[List[schemas.MediaServerLibrary]]: + """ + 媒体库列表 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if server_obj: + return server_obj.get_librarys(hidden=hidden) + return None + + def mediaserver_items( + self, + server: str, + library_id: Union[str, int], + start_index: Optional[int] = 0, + limit: Optional[int] = -1, + ) -> Optional[Generator]: + """ + 获取媒体服务器项目列表 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if server_obj: + return server_obj.get_items(library_id, start_index, limit) + return None + + def mediaserver_iteminfo( + self, server: str, item_id: str + ) -> Optional[schemas.MediaServerItem]: + """ + 媒体库项目详情 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if server_obj: + return server_obj.get_iteminfo(item_id) + return None + + def mediaserver_tv_episodes( + self, server: str, item_id: Union[str, int] + ) -> Optional[List[schemas.MediaServerSeasonInfo]]: + """ + 获取剧集信息 + """ + if not item_id: + return None + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return None + _, seasoninfo = server_obj.get_tv_episodes(item_id=str(item_id)) + if not seasoninfo: + return [] + return [ + schemas.MediaServerSeasonInfo(season=season, episodes=episodes) + for season, episodes in seasoninfo.items() + ] + + def mediaserver_playing( + self, server: str, count: Optional[int] = 20, **kwargs + ) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器正在播放信息 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_resume(num=count) or [] + + def mediaserver_play_url( + self, server: str, item_id: Union[str, int] + ) -> Optional[str]: + """ + 获取媒体库播放地址 + """ + if not item_id: + return None + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return None + return server_obj.get_play_url(str(item_id)) + + def mediaserver_latest( + self, + server: Optional[str] = None, + count: Optional[int] = 20, + **kwargs, + ) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器最新入库条目 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_latest(num=count) or [] + + def mediaserver_latest_images( + self, + server: Optional[str] = None, + count: Optional[int] = 20, + remote: Optional[bool] = False, + **kwargs, + ) -> List[str]: + """ + 获取媒体服务器最新入库条目的图片 + """ + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_latest_backdrops(num=count, remote=remote) or [] + + def mediaserver_image_cookies( + self, + server: Optional[str] = None, + image_url: Optional[str] = None, + **kwargs, + ) -> Optional[str | dict]: + """ + 获取绿联影视服务器的图片Cookies + """ + if not image_url: + return None + if server: + server_obj: Optional[Ugreen] = self.get_instance(server) + if not server_obj: + return None + return server_obj.get_image_cookies(image_url) + for server_obj in self.get_instances().values(): + if cookies := server_obj.get_image_cookies(image_url): + return cookies + return None diff --git a/app/modules/ugreen/api.py b/app/modules/ugreen/api.py new file mode 100644 index 00000000..2f682934 --- /dev/null +++ b/app/modules/ugreen/api.py @@ -0,0 +1,746 @@ +import base64 +import uuid +from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional, Union +from urllib.parse import urlsplit, urlunsplit + +from requests import Session + +from app.log import logger +from app.utils.ugreen_crypto import UgreenCrypto +from app.utils.url import UrlUtils + + +@dataclass +class ApiResult: + code: int = -1 + msg: str = "" + data: Any = None + debug: Optional[str] = None + raw: Optional[dict] = None + + @property + def success(self) -> bool: + return self.code == 200 + + +class Api: + """ + 绿联影视 API 客户端(统一加密通道)。 + + 说明: + 1. 所有业务接口调用都应走 `request()`; + 2. `request()` 会自动将明文查询参数加密为 `encrypt_query`; + 3. 若响应包含 `encrypt_resp_body`,会自动完成解密后再返回。 + """ + + __slots__ = ( + "_host", + "_session", + "_token", + "_static_token", + "_is_ugk", + "_public_key", + "_crypto", + "_username", + "_client_id", + "_client_version", + "_language", + "_ug_agent", + "_timeout", + ) + + def __init__( + self, + host: str, + client_version: str = "76363", + language: str = "zh-CN", + ug_agent: str = "PC/WEB", + timeout: int = 20, + ): + self._host = self._normalize_base_url(host) + self._session = Session() + + self._token: Optional[str] = None + self._static_token: Optional[str] = None + self._is_ugk: bool = False + self._public_key: Optional[str] = None + self._crypto: Optional[UgreenCrypto] = None + self._username: Optional[str] = None + + self._client_id = f"{uuid.uuid4()}-WEB" + self._client_version = client_version + self._language = language + self._ug_agent = ug_agent + self._timeout = timeout + + @property + def host(self) -> str: + return self._host + + @property + def token(self) -> Optional[str]: + return self._token + + @property + def static_token(self) -> Optional[str]: + return self._static_token + + @property + def is_ugk(self) -> bool: + return self._is_ugk + + @property + def public_key(self) -> Optional[str]: + return self._public_key + + def close(self): + """ + 关闭底层 HTTP 会话。 + """ + self._session.close() + + @staticmethod + def _normalize_base_url(host: str) -> str: + if not host: + return "" + host = UrlUtils.standardize_base_url(host).rstrip("/") + parsed = urlsplit(host) + return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/") + + @staticmethod + def _decode_public_key(raw: Optional[str]) -> Optional[str]: + if not raw: + return None + value = str(raw).strip() + if not value: + return None + if "BEGIN" in value: + return value + try: + return base64.b64decode(value).decode("utf-8") + except Exception: + return None + + @staticmethod + def _extract_rsa_token(resp_json: dict, headers: Mapping[str, str]) -> Optional[str]: + token = headers.get("x-rsa-token") or headers.get("X-Rsa-Token") + if token: + return token + token = resp_json.get("xRsaToken") or resp_json.get("x-rsa-token") + if token: + return token + data = resp_json.get("data") if isinstance(resp_json, Mapping) else None + if isinstance(data, Mapping): + return data.get("xRsaToken") or data.get("x-rsa-token") + return None + + def _common_headers(self) -> dict[str, str]: + """ + 获取绿联 Web 端通用请求头。 + """ + return { + "Accept": "application/json, text/plain, */*", + "Client-Id": self._client_id, + "Client-Version": self._client_version, + "UG-Agent": self._ug_agent, + "X-Specify-Language": self._language, + } + + def _request_json( + self, + url: str, + method: str = "GET", + headers: Optional[dict] = None, + params: Optional[dict] = None, + json_data: Optional[dict] = None, + ) -> Optional[dict]: + """ + 发送 HTTP 请求并尝试解析为 JSON。 + """ + try: + method = method.upper() + if method == "POST": + resp = self._session.post( + url=url, + headers=headers, + params=params, + json=json_data, + timeout=self._timeout, + verify=False, + ) + else: + resp = self._session.get( + url=url, + headers=headers, + params=params, + timeout=self._timeout, + verify=False, + ) + return resp.json() + except Exception as err: + logger.error(f"请求绿联接口失败:{url} {err}") + return None + + @staticmethod + def _build_result(payload: Any) -> ApiResult: + if not isinstance(payload, Mapping): + return ApiResult(code=-1, msg="响应格式错误", raw=None) + code = payload.get("code") + try: + code = int(code) + except Exception: + code = -1 + return ApiResult( + code=code, + msg=str(payload.get("msg") or ""), + data=payload.get("data"), + debug=payload.get("debug"), + raw=dict(payload), + ) + + def login(self, username: str, password: str, keepalive: bool = True) -> Optional[str]: + """ + 登录绿联账号并初始化加密上下文。 + + :param username: 用户名 + :param password: 密码(会先做 RSA 分段加密) + :param keepalive: 是否保持登录 + :return: 登录成功返回 token + """ + if not username or not password: + return None + + headers = self._common_headers() + + try: + check_resp = self._session.post( + url=f"{self._host}/ugreen/v1/verify/check", + headers=headers, + json={"username": username}, + timeout=self._timeout, + verify=False, + ) + check_json = check_resp.json() + except Exception as err: + logger.error(f"绿联获取登录公钥失败:{err}") + return None + + check_result = self._build_result(check_json) + if not check_result.success: + logger.error(f"绿联获取登录公钥失败:{check_result.msg}") + return None + + rsa_token = self._extract_rsa_token(check_json, check_resp.headers) + login_public_key = self._decode_public_key(rsa_token) + if not login_public_key: + logger.error("绿联获取登录公钥失败:公钥为空") + return None + + encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long(password) + login_json = self._request_json( + url=f"{self._host}/ugreen/v1/verify/login", + method="POST", + headers=headers, + json_data={ + "username": username, + "password": encrypted_password, + "keepalive": keepalive, + "otp": True, + "is_simple": True, + }, + ) + if not login_json: + return None + + login_result = self._build_result(login_json) + if not login_result.success or not isinstance(login_result.data, Mapping): + logger.error(f"绿联登录失败:{login_result.msg}") + return None + + token = str(login_result.data.get("token") or "").strip() + public_key = self._decode_public_key(str(login_result.data.get("public_key") or "")) + if not token or not public_key: + logger.error("绿联登录失败:未返回 token/public_key") + return None + + self._token = token + static_token = str(login_result.data.get("static_token") or "").strip() + self._static_token = static_token or self._token + self._is_ugk = bool(login_result.data.get("is_ugk")) + self._public_key = public_key + self._crypto = UgreenCrypto( + public_key=self._public_key, + token=self._token, + client_id=self._client_id, + client_version=self._client_version, + ug_agent=self._ug_agent, + language=self._language, + ) + self._username = username + return self._token + + def export_session_state(self) -> Optional[dict]: + """ + 导出当前登录会话,供持久化存储使用。 + """ + if not self._token or not self._public_key: + return None + return { + "token": self._token, + "static_token": self._static_token, + "is_ugk": self._is_ugk, + "public_key": self._public_key, + "username": self._username, + "client_id": self._client_id, + "client_version": self._client_version, + "language": self._language, + "ug_agent": self._ug_agent, + "cookies": self._session.cookies.get_dict(), + } + + def import_session_state(self, state: Mapping[str, Any]) -> bool: + """ + 从持久化数据恢复登录会话,避免重复登录。 + """ + if not isinstance(state, Mapping): + return False + + token = str(state.get("token") or "").strip() + public_key = self._decode_public_key(str(state.get("public_key") or "")) + if not token or not public_key: + return False + + static_token = str(state.get("static_token") or "").strip() + is_ugk = bool(state.get("is_ugk")) + + # 会话可能与 client_id 绑定,需恢复原客户端信息 + client_id = str(state.get("client_id") or "").strip() + if client_id: + self._client_id = client_id + + client_version = str(state.get("client_version") or "").strip() + if client_version: + self._client_version = client_version + + language = str(state.get("language") or "").strip() + if language: + self._language = language + + ug_agent = str(state.get("ug_agent") or "").strip() + if ug_agent: + self._ug_agent = ug_agent + + username = str(state.get("username") or "").strip() + self._username = username or None + + cookies = state.get("cookies") + if isinstance(cookies, Mapping): + try: + self._session.cookies.update( + { + str(k): str(v) + for k, v in cookies.items() + if k is not None and v is not None + } + ) + except Exception: + pass + + self._token = token + self._static_token = static_token or self._token + self._is_ugk = is_ugk + self._public_key = public_key + self._crypto = UgreenCrypto( + public_key=self._public_key, + token=self._token, + client_id=self._client_id, + client_version=self._client_version, + ug_agent=self._ug_agent, + language=self._language, + ) + return True + + def logout(self): + """ + 登出并清理本地认证状态。 + """ + if not self._token or not self._crypto: + return + try: + req = self._crypto.build_encrypted_request( + url=f"{self._host}/ugreen/v1/verify/logout", + method="GET", + params={}, + ) + self._session.get( + req.url, + headers=req.headers, + params=req.params, + timeout=self._timeout, + verify=False, + ) + except Exception: + pass + self._token = None + self._static_token = None + self._is_ugk = False + self._public_key = None + self._crypto = None + self._username = None + + def request( + self, + path: str, + method: str = "GET", + params: Optional[dict] = None, + data: Optional[dict] = None, + ) -> ApiResult: + """ + 统一请求入口。 + + 核心行为: + 1. 自动把 `params` 明文序列化并加密为 `encrypt_query`; + 2. 自动注入绿联安全头(`X-Ugreen-*`); + 3. 对 `POST/PUT/PATCH` 的 JSON 体加密; + 4. 自动解密 `encrypt_resp_body`。 + + :param path: `/ugreen/` 后的相对路径,例如 `v1/video/homepage/media_list` + :param method: HTTP 方法 + :param params: 明文查询参数(无需自己处理 encrypt_query) + :param data: 明文 JSON 请求体(自动加密) + """ + if not self._crypto: + return ApiResult(code=-1, msg="未登录") + + api_path = path.strip("/") + # 由加密工具自动构建 encrypt_query 与加密请求体 + req = self._crypto.build_encrypted_request( + url=f"{self._host}/ugreen/{api_path}", + method=method.upper(), + params=params or {}, + data=data, + encrypt_body=method.upper() in {"POST", "PUT", "PATCH"}, + ) + + payload = self._request_json( + url=req.url, + method=method, + headers=req.headers, + params=req.params, + json_data=req.json, + ) + if payload is None: + return ApiResult(code=-1, msg="接口请求失败") + + # 响应若包含 encrypt_resp_body,这里会自动解密 + decrypted = self._crypto.decrypt_response(payload, req.aes_key) + return self._build_result(decrypted) + + def current_user(self) -> Optional[dict]: + """ + 获取当前登录用户信息。 + """ + result = self.request("v1/user/current/user") + if not result.success or not isinstance(result.data, Mapping): + return None + return dict(result.data) + + def media_list(self) -> list[dict]: + """ + 获取首页媒体库列表(`media_lib_info_list`)。 + """ + result = self.request("v1/video/homepage/media_list") + if not result.success or not isinstance(result.data, Mapping): + return [] + items = result.data.get("media_lib_info_list") + return items if isinstance(items, list) else [] + + def media_lib_users(self) -> list[dict]: + """ + 获取媒体库用户列表。 + """ + result = self.request("v1/video/media_lib/get_user_list") + if not result.success or not isinstance(result.data, Mapping): + return [] + users = result.data.get("user_info_arr") + return users if isinstance(users, list) else [] + + def recently_played(self, page: int = 1, page_size: int = 12) -> Optional[dict]: + """ + 获取继续观看列表。 + """ + result = self.request( + "v1/video/recently_played/get", + params={ + "page": page, + "page_size": page_size, + "language": self._language, + "create_time_order": "false", + }, + ) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def recently_updated(self, page: int = 1, page_size: int = 20) -> Optional[dict]: + """ + 获取最近更新列表。 + """ + result = self.request( + "v1/video/recently_update/get", + params={ + "page": page, + "page_size": page_size, + "language": self._language, + "create_time_order": "false", + }, + ) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def recently_played_info(self, item_id: Union[str, int]) -> Optional[dict]: + """ + 获取单个视频的播放状态与基础详情信息。 + """ + result = self.request( + "v1/video/recently_played/info", + params={ + "ug_video_info_id": item_id, + "version_control": "true", + }, + ) + if result.code in {200, 1303} and isinstance(result.data, Mapping): + return dict(result.data) + return None + + def search(self, keyword: str, offset: int = 0, limit: int = 200) -> Optional[dict]: + """ + 搜索媒体(电影/剧集)。 + """ + result = self.request( + "v1/video/search", + params={ + "language": self._language, + "search_type": 1, + "offset": offset, + "limit": limit, + "keyword": keyword, + }, + ) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def video_all(self, classification: int, page: int = 1, page_size: int = 20) -> Optional[dict]: + """ + 获取 `v1/video/all` 分类列表。 + + 常用分类: + -102: 电影 + -103: 电视剧 + """ + result = self.request( + "v1/video/all", + params={ + "page": page, + "pageSize": page_size, + "classification": classification, + "sort_type": 2, + "order_type": 2, + "release_date_begin": -9999999999, + "release_date_end": -9999999999, + "identify_status": 0, + "watch_status": -1, + "ug_style_id": 0, + "ug_country_id": 0, + "clarity": -1, + }, + ) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def poster_wall_get_folder( + self, + path: Optional[str] = None, + page: int = 1, + page_size: int = 100, + sort_type: int = 1, + order_type: int = 1, + ) -> Optional[dict]: + """ + 获取海报墙文件夹与条目(可按目录路径递归展开)。 + """ + params: Dict[str, Any] = { + "page": page, + "page_size": page_size, + "sort_type": sort_type, + "order_type": order_type, + } + if path: + params["path"] = path + result = self.request("v1/video/poster_wall/media_lib/get_folder", params=params) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def get_movie( + self, + item_id: Union[str, int], + media_lib_set_id: Union[str, int], + path: Optional[str] = None, + folder_path: Optional[str] = None, + ) -> Optional[dict]: + """ + 获取电影详情。 + """ + params: Dict[str, Any] = { + "id": item_id, + "media_lib_set_id": media_lib_set_id, + "fileVersion": "true", + } + if path: + params["path"] = path + if folder_path: + params["folder_path"] = folder_path + result = self.request("v1/video/details/getMovie", params=params) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def get_tv(self, item_id: Union[str, int], folder_path: str = "ALL") -> Optional[dict]: + """ + 获取剧集详情(含季/集信息)。 + """ + result = self.request( + "v2/video/details/getTV", + params={ + "ug_video_info_id": item_id, + "folder_path": folder_path, + }, + ) + return result.data if result.success and isinstance(result.data, Mapping) else None + + def scan(self, media_lib_set_id: Union[str, int], scan_type: int = 2, op_type: int = 2) -> bool: + """ + 触发媒体库扫描。 + + :param media_lib_set_id: 媒体库 ID + :param scan_type: 扫描类型(1: 新添加和修改, 2: 补充缺失, 3: 覆盖扫描) + :param op_type: 操作类型(网页端常用 2) + """ + result = self.request( + "v1/video/media_lib/scan", + params={ + "op_type": op_type, + "media_lib_set_id": media_lib_set_id, + "media_lib_scan_type": scan_type, + }, + ) + return result.success + + def scan_status(self, only_brief: bool = True) -> list[dict]: + """ + 获取媒体库扫描状态。 + """ + result = self.request( + "v1/video/media_lib/scan/status", + params={"only_brief": "true" if only_brief else "false"}, + ) + if not result.success or not isinstance(result.data, Mapping): + return [] + arr = result.data.get("media_lib_scan_status_arr") + return arr if isinstance(arr, list) else [] + + def preferences_all(self) -> Optional[Any]: + """ + 获取影视偏好设置(`v1/video/preferences/all`)。 + """ + result = self.request("v1/video/preferences/all") + return result.data if result.success else None + + def history_get(self, num: int = 10) -> Optional[Any]: + """ + 获取历史记录(`v1/video/history/get`)。 + """ + result = self.request("v1/video/history/get", params={"num": num}) + return result.data if result.success else None + + def data_source_get_config(self) -> Optional[Any]: + """ + 获取数据源配置(`v1/video/data_source/get_config`)。 + """ + result = self.request("v1/video/data_source/get_config") + return result.data if result.success else None + + def homepage_slider( + self, language: Optional[str] = None, app_name: str = "web" + ) -> Optional[Any]: + """ + 获取首页轮播数据(`v1/video/homepage/slider`)。 + """ + result = self.request( + "v1/video/homepage/slider", + params={ + "language": language or self._language, + "app_name": app_name, + }, + ) + return result.data if result.success else None + + def media_lib_guide_init(self) -> Optional[Any]: + """ + 获取媒体库引导初始化信息(`v1/video/media_lib/guide_init`)。 + """ + result = self.request("v1/video/media_lib/guide_init") + return result.data if result.success else None + + def media_lib_filter_options( + self, media_type: int = 0, language: Optional[str] = None + ) -> Optional[Any]: + """ + 获取媒体库筛选项(`v1/video/media_lib/filter/options`)。 + """ + result = self.request( + "v1/video/media_lib/filter/options", + params={ + "type": media_type, + "language": language or self._language, + }, + ) + return result.data if result.success else None + + def guide(self, guide_position: int = 1, client_type: int = 1) -> Optional[Any]: + """ + 获取引导位数据(`v1/video/guide`)。 + """ + result = self.request( + "v1/video/guide", + params={ + "guide_position": guide_position, + "client_type": client_type, + }, + ) + return result.data if result.success else None + + def homepage_v2(self, language: Optional[str] = None) -> Optional[Any]: + """ + 获取新版首页聚合数据(`v2/video/homepage`)。 + """ + result = self.request( + "v2/video/homepage", + params={"language": language or self._language}, + ) + return result.data if result.success else None + + def media_lib_init_user_permission(self) -> Optional[Any]: + """ + 初始化用户媒体库权限(`v1/video/media_lib/init_user_permission`)。 + """ + result = self.request("v1/video/media_lib/init_user_permission") + return result.data if result.success else None + + def media_lib_get_all( + self, req_type: int = 2, language: Optional[str] = None + ) -> Optional[Any]: + """ + 获取全部媒体库集合(`v1/video/media_lib/get_all`)。 + """ + result = self.request( + "v1/video/media_lib/get_all", + params={ + "mediaLib_get_all_req_type": req_type, + "language": language or self._language, + }, + ) + return result.data if result.success else None diff --git a/app/modules/ugreen/ugreen.py b/app/modules/ugreen/ugreen.py new file mode 100644 index 00000000..66786c6a --- /dev/null +++ b/app/modules/ugreen/ugreen.py @@ -0,0 +1,944 @@ +import hashlib +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Generator, List, Mapping, Optional, Union +from urllib.parse import parse_qs, urlparse + +from app import schemas +from app.db.systemconfig_oper import SystemConfigOper +from app.log import logger +from app.modules.ugreen.api import Api +from app.schemas import MediaType +from app.schemas.types import SystemConfigKey +from app.utils.url import UrlUtils + + +class Ugreen: + _username: Optional[str] = None + _password: Optional[str] = None + + _userinfo: Optional[dict] = None + _host: Optional[str] = None + _playhost: Optional[str] = None + + _libraries: dict[str, dict] = {} + _library_paths: dict[str, str] = {} + _sync_libraries: List[str] = [] + _scan_type: int = 2 + + _api: Optional[Api] = None + + def __init__( + self, + host: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + play_host: Optional[str] = None, + sync_libraries: Optional[list] = None, + scan_mode: Optional[Union[str, int]] = None, + scan_type: Optional[Union[str, int]] = None, + **kwargs, + ): + if not host or not username or not password: + logger.error("绿联影视配置不完整!!") + return + + self._host = host + self._username = username + self._password = password + self._sync_libraries = sync_libraries or [] + # 绿联媒体库扫描模式: + # 1 新添加和修改、2 补充缺失、3 覆盖扫描 + self._scan_type = self.__resolve_scan_type(scan_mode=scan_mode, scan_type=scan_type) + + if play_host: + self._playhost = UrlUtils.standardize_base_url(play_host).rstrip("/") + + if not self.reconnect(): + logger.error(f"请检查服务端地址 {host}") + + @property + def api(self) -> Optional[Api]: + return self._api + + def close(self): + self.disconnect() + + def is_configured(self) -> bool: + return bool(self._host and self._username and self._password) + + def is_authenticated(self) -> bool: + return ( + self.is_configured() + and self._api is not None + and self._api.token is not None + and self._userinfo is not None + ) + + def is_inactive(self) -> bool: + if not self.is_authenticated(): + return True + self._userinfo = self._api.current_user() if self._api else None + return self._userinfo is None + + def __session_cache_key(self) -> str: + """ + 生成当前绿联实例的会话缓存键(基于 host + username)。 + """ + normalized_host = UrlUtils.standardize_base_url(self._host or "").rstrip("/").lower() + username = (self._username or "").strip().lower() + raw = f"{normalized_host}|{username}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + def __password_digest(self) -> str: + """ + 存储密码摘要用于检测配置是否变更,避免明文落盘。 + """ + return hashlib.sha256((self._password or "").encode("utf-8")).hexdigest() + + @staticmethod + def __load_all_session_cache() -> dict: + sessions = SystemConfigOper().get(SystemConfigKey.UgreenSessionCache) + return sessions if isinstance(sessions, dict) else {} + + @staticmethod + def __save_all_session_cache(sessions: dict): + SystemConfigOper().set(SystemConfigKey.UgreenSessionCache, sessions) + + def __remove_persisted_session(self): + cache_key = self.__session_cache_key() + sessions = self.__load_all_session_cache() + if cache_key in sessions: + sessions.pop(cache_key, None) + self.__save_all_session_cache(sessions) + + def __save_persisted_session(self): + if not self._api: + return + session_state = self._api.export_session_state() + if not session_state: + return + + sessions = self.__load_all_session_cache() + cache_key = self.__session_cache_key() + sessions[cache_key] = { + **session_state, + "host": UrlUtils.standardize_base_url(self._host or "").rstrip("/"), + "username": self._username, + "password_digest": self.__password_digest(), + "updated_at": int(datetime.now().timestamp()), + } + self.__save_all_session_cache(sessions) + + def __restore_persisted_session(self) -> bool: + cache_key = self.__session_cache_key() + sessions = self.__load_all_session_cache() + cached = sessions.get(cache_key) + if not isinstance(cached, Mapping): + return False + + # 配置变更(尤其密码变更)后,不复用旧会话 + if cached.get("password_digest") != self.__password_digest(): + logger.info(f"绿联影视 {self._username} 检测到密码变更,清理旧会话缓存") + self.__remove_persisted_session() + return False + + api = Api(host=self._host) + if not api.import_session_state(cached): + api.close() + self.__remove_persisted_session() + return False + + userinfo = api.current_user() + if not userinfo: + # 会话失效,清理缓存并走正常登录 + api.close() + self.__remove_persisted_session() + logger.info(f"绿联影视 {self._username} 持久化会话已失效,准备重新登录") + return False + + self._api = api + self._userinfo = userinfo + logger.debug(f"{self._username} 已复用绿联影视持久化会话") + return True + + def reconnect(self) -> bool: + if not self.is_configured(): + return False + + # 关闭旧连接(不主动登出,避免破坏可复用会话) + self.disconnect(logout=False) + + if self.__restore_persisted_session(): + self.get_librarys() + return True + + self._api = Api(host=self._host) + if self._api.login(self._username, self._password) is None: + self.__remove_persisted_session() + return False + + self._userinfo = self._api.current_user() + if not self._userinfo: + self.__remove_persisted_session() + return False + + # 登录成功后持久化参数,下次优先复用 + self.__save_persisted_session() + logger.debug(f"{self._username} 成功登录绿联影视") + self.get_librarys() + return True + + def disconnect(self, logout: bool = False): + if self._api: + if logout: + # 显式登出时同步清理本地缓存 + self._api.logout() + self.__remove_persisted_session() + self._api.close() + self._api = None + self._userinfo = None + logger.debug(f"{self._username} 已断开绿联影视") + + @staticmethod + def __normalize_dir_path(path: Union[str, Path, None]) -> str: + if path is None: + return "" + value = str(path).replace("\\", "/").rstrip("/") + return value + + @staticmethod + def __is_subpath(path: Union[str, Path, None], parent: Union[str, Path, None]) -> bool: + path_str = Ugreen.__normalize_dir_path(path) + parent_str = Ugreen.__normalize_dir_path(parent) + if not path_str or not parent_str: + return False + return path_str == parent_str or path_str.startswith(parent_str + "/") + + def __build_image_stream_url(self, source_url: str, size: int = 1) -> Optional[str]: + """ + 通过绿联 getImaStream 中转图片,规避 scraper.ugnas.com 403 问题。 + """ + if not self._api: + return None + + auth_token = self._api.static_token or self._api.token + if not auth_token: + return None + + params = { + "app_name": "web", + "name": source_url, + "size": size, + } + if self._api.is_ugk: + params["ugk"] = auth_token + else: + params["token"] = auth_token + + return UrlUtils.combine_url( + host=self._api.host, + path="/ugreen/v2/video/getImaStream", + query=params, + ) + + def __resolve_image(self, path: Optional[str]) -> Optional[str]: + if not path: + return None + if path.startswith("http://") or path.startswith("https://"): + parsed = urlparse(path) + if parsed.netloc.lower() == "scraper.ugnas.com": + # scraper 链接优先改为本机 getImaStream,避免签名过期导致 403 + if image_stream_url := self.__build_image_stream_url(path): + return image_stream_url + + # 绿联返回的 scraper.ugnas.com 图片常带 auth_key 时效签名, + # 过期后会直接 403。这里提前过滤,避免前端出现裂图。 + if self.__is_expired_signed_image(path): + return None + return path + # 绿联本地图片路径需要额外鉴权头,MP图片代理当前仅支持Cookie,故先忽略本地路径。 + return None + + @staticmethod + def __is_expired_signed_image(url: str) -> bool: + """ + 判断绿联 scraper 签名图是否已过期。 + + auth_key 结构通常为: + `{过期时间戳}-{随机串}-...` + """ + try: + parsed = urlparse(url) + if parsed.netloc.lower() != "scraper.ugnas.com": + return False + auth_key = parse_qs(parsed.query).get("auth_key", [None])[0] + if not auth_key: + return False + expire_part = str(auth_key).split("-", 1)[0] + expire_ts = int(expire_part) + now_ts = int(datetime.now().timestamp()) + return expire_ts <= now_ts + except Exception: + return False + + @staticmethod + def __parse_year(video_info: dict) -> Optional[Union[str, int]]: + year = video_info.get("year") + if isinstance(year, int) and year > 0: + return year + release_date = video_info.get("release_date") + if isinstance(release_date, (int, float)) and release_date > 0: + try: + return datetime.fromtimestamp(release_date).year + except Exception: + return None + return None + + @staticmethod + def __map_item_type(video_type: Any) -> Optional[str]: + if video_type == 2: + return "Series" + if video_type == 1: + return "Movie" + if video_type == 3: + return "Collection" + if video_type == 0: + return "Folder" + return "Video" + + @staticmethod + def __build_media_server_item(video_info: dict, play_status: Optional[dict] = None): + user_state = schemas.MediaServerItemUserState() + if isinstance(play_status, dict): + progress = play_status.get("progress") + watch_status = play_status.get("watch_status") + if watch_status == 2: + user_state.played = True + if isinstance(progress, (int, float)) and progress > 0: + user_state.resume = progress < 1 + user_state.percentage = progress * 100.0 + last_play_time = play_status.get("last_access_time") or play_status.get("LastPlayTime") + if isinstance(last_play_time, (int, float)) and last_play_time > 0: + user_state.last_played_date = str(int(last_play_time)) + + tmdb_id = video_info.get("tmdb_id") + if not isinstance(tmdb_id, int) or tmdb_id <= 0: + tmdb_id = None + + item_id = video_info.get("ug_video_info_id") + if item_id is None: + return None + + return schemas.MediaServerItem( + server="ugreen", + library=video_info.get("media_lib_set_id"), + item_id=str(item_id), + item_type=Ugreen.__map_item_type(video_info.get("type")), + title=video_info.get("name"), + original_title=video_info.get("original_name"), + year=Ugreen.__parse_year(video_info), + tmdbid=tmdb_id, + user_state=user_state, + ) + + def __build_root_url(self) -> str: + """ + 统一返回 NAS Web 根地址作为跳转链接,避免失效深链。 + """ + host = self._playhost or (self._api.host if self._api else "") + if not host: + return "" + return f"{host.rstrip('/')}/" + + def __build_play_url(self, item_id: Union[str, int], video_type: Any, media_lib_set_id: Any) -> str: + # 绿联深链在部分版本会失效,统一回落到 NAS 根地址。 + return self.__build_root_url() + + def __build_play_item_from_wrapper(self, wrapper: dict) -> Optional[schemas.MediaServerPlayItem]: + video_info = wrapper.get("video_info") if isinstance(wrapper.get("video_info"), dict) else wrapper + if not isinstance(video_info, dict): + return None + + item_id = video_info.get("ug_video_info_id") + if item_id is None: + return None + + play_status = wrapper.get("play_status") if isinstance(wrapper.get("play_status"), dict) else {} + progress = play_status.get("progress") if isinstance(play_status.get("progress"), (int, float)) else 0 + + if video_info.get("type") == 2: + subtitle = play_status.get("tv_name") or "剧集" + media_type = MediaType.TV.value + else: + subtitle = "电影" if video_info.get("type") == 1 else "视频" + media_type = MediaType.MOVIE.value + + image = self.__resolve_image(video_info.get("poster_path")) or self.__resolve_image( + video_info.get("backdrop_path") + ) + + return schemas.MediaServerPlayItem( + id=str(item_id), + title=video_info.get("name"), + subtitle=subtitle, + type=media_type, + image=image, + link=self.__build_play_url(item_id, video_info.get("type"), video_info.get("media_lib_set_id")), + percent=max(0.0, min(100.0, progress * 100.0)), + server_type="ugreen", + use_cookies=False, + ) + + def __infer_library_type(self, name: str, path: Optional[str]) -> str: + name = name or "" + path = path or "" + if "电视剧" in path or any(key in name for key in ["剧", "综艺", "动漫", "纪录片"]): + return MediaType.TV.value + if "电影" in path or "电影" in name: + return MediaType.MOVIE.value + return MediaType.UNKNOWN.value + + def __is_library_blocked(self, library_id: str) -> bool: + return ( + True + if ( + self._sync_libraries + and "all" not in self._sync_libraries + and str(library_id) not in self._sync_libraries + ) + else False + ) + + @staticmethod + def __resolve_scan_type( + scan_mode: Optional[Union[str, int]] = None, + scan_type: Optional[Union[str, int]] = None, + ) -> int: + """ + 解析绿联扫描模式并转为 `media_lib_scan_type`。 + + 支持值: + - 1 / new_and_modified: 新添加和修改 + - 2 / supplement_missing: 补充缺失 + - 3 / full_override: 覆盖扫描 + """ + # 优先使用显式 scan_type 数值配置。 + for value in (scan_type, scan_mode): + try: + parsed = int(value) # type: ignore[arg-type] + if parsed in (1, 2, 3): + return parsed + except Exception: + pass + + mode = str(scan_mode or "").strip().lower() + mode_map = { + "new_and_modified": 1, + "new_modified": 1, + "add": 1, + "added": 1, + "new": 1, + "scan_new_modified": 1, + "supplement_missing": 2, + "supplement": 2, + "additional": 2, + "missing": 2, + "scan_missing": 2, + "full_override": 3, + "override": 3, + "cover": 3, + "replace": 3, + "scan_override": 3, + } + return mode_map.get(mode, 2) + + def __scan_library(self, library_id: str, scan_type: Optional[int] = None) -> bool: + if not self._api: + return False + return self._api.scan( + media_lib_set_id=library_id, + scan_type=scan_type or self._scan_type, + op_type=2, + ) + + def __load_library_paths(self) -> dict[str, str]: + if not self._api: + return {} + + paths: dict[str, str] = {} + page = 1 + while True: + data = self._api.poster_wall_get_folder(page=page, page_size=100) + if not data: + break + + for folder in data.get("folder_arr") or []: + lib_id = folder.get("media_lib_set_id") + lib_path = folder.get("path") + if lib_id is not None and lib_path: + paths[str(lib_id)] = str(lib_path) + + if data.get("is_last_page") is True: + break + page += 1 + + return paths + + def get_librarys(self, hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]: + if not self.is_authenticated() or not self._api: + return [] + + media_libs = self._api.media_list() + self._library_paths = self.__load_library_paths() + libraries = [] + self._libraries = {} + + for lib in media_libs: + lib_id = str(lib.get("media_lib_set_id")) + if hidden and self.__is_library_blocked(lib_id): + continue + + lib_name = lib.get("media_name") or "" + lib_path = self._library_paths.get(lib_id) + library_type = self.__infer_library_type(lib_name, lib_path) + + poster_paths = lib.get("poster_paths") or [] + backdrop_paths = lib.get("backdrop_paths") or [] + image_list = list( + filter( + None, + [self.__resolve_image(p) for p in [*poster_paths, *backdrop_paths]], + ) + ) + + self._libraries[lib_id] = { + "id": lib_id, + "name": lib_name, + "path": lib_path, + "type": library_type, + "video_count": lib.get("video_count") or 0, + } + + libraries.append( + schemas.MediaServerLibrary( + server="ugreen", + id=lib_id, + name=lib_name, + type=library_type, + path=lib_path, + image_list=image_list, + link=self.__build_root_url(), + server_type="ugreen", + use_cookies=False, + ) + ) + + return libraries + + def get_user_count(self) -> int: + if not self.is_authenticated() or not self._api: + return 0 + users = self._api.media_lib_users() + return len(users) + + def get_medias_count(self) -> schemas.Statistic: + if not self.is_authenticated() or not self._api: + return schemas.Statistic() + + movie_data = self._api.video_all(classification=-102, page=1, page_size=1) or {} + tv_data = self._api.video_all(classification=-103, page=1, page_size=1) or {} + + return schemas.Statistic( + movie_count=int(movie_data.get("total_num") or 0), + tv_count=int(tv_data.get("total_num") or 0), + # 绿联当前不统计剧集总数,返回 None 由前端展示“未获取”。 + episode_count=None, + ) + + def authenticate(self, username: str, password: str) -> Optional[str]: + if not username or not password or not self._host: + return None + + api = Api(self._host) + try: + return api.login(username, password) + finally: + api.logout() + api.close() + + def __extract_video_info_list(self, bucket: Any) -> list[dict]: + if not isinstance(bucket, Mapping): + return [] + video_arr = bucket.get("video_arr") + if not isinstance(video_arr, list): + return [] + result = [] + for item in video_arr: + if not isinstance(item, Mapping): + continue + info = item.get("video_info") + if isinstance(info, Mapping): + result.append(dict(info)) + return result + + def get_movies( + self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None + ) -> Optional[List[schemas.MediaServerItem]]: + if not self.is_authenticated() or not self._api or not title: + return None + + data = self._api.search(title) + if not data: + return [] + + movies = [] + for info in self.__extract_video_info_list(data.get("movies_list")): + info_tmdb = info.get("tmdb_id") + if tmdb_id and tmdb_id != info_tmdb: + continue + if title not in [info.get("name"), info.get("original_name")]: + continue + item_year = info.get("year") + if year and str(item_year) != str(year): + continue + media_item = self.__build_media_server_item(info) + if media_item: + movies.append(media_item) + return movies + + def __search_tv_item(self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None) -> Optional[dict]: + if not self._api: + return None + data = self._api.search(title) + if not data: + return None + + for info in self.__extract_video_info_list(data.get("tv_list")): + if tmdb_id and tmdb_id != info.get("tmdb_id"): + continue + if title not in [info.get("name"), info.get("original_name")]: + continue + item_year = info.get("year") + if year and str(item_year) != str(year): + continue + return info + return None + + def get_tv_episodes( + self, + item_id: Optional[str] = None, + title: Optional[str] = None, + year: Optional[str] = None, + tmdb_id: Optional[int] = None, + season: Optional[int] = None, + ) -> tuple[Optional[str], Optional[Dict[int, list]]]: + if not self.is_authenticated() or not self._api: + return None, None + + if not item_id: + if not title: + return None, None + if not (tv_info := self.__search_tv_item(title, year, tmdb_id)): + return None, None + found_item_id = tv_info.get("ug_video_info_id") + if found_item_id is None: + return None, None + item_id = str(found_item_id) + else: + item_id = str(item_id) + + item_info = self.get_iteminfo(item_id) + if not item_info: + return None, {} + if tmdb_id and item_info.tmdbid and tmdb_id != item_info.tmdbid: + return None, {} + + tv_detail = self._api.get_tv(item_id, folder_path="ALL") + if not tv_detail: + return None, {} + + season_map = {} + for info in tv_detail.get("season_info") or []: + if not isinstance(info, dict): + continue + category_id = info.get("category_id") + season_num = info.get("season_num") + if category_id and isinstance(season_num, int): + season_map[str(category_id)] = season_num + + season_episodes: Dict[int, list] = {} + for ep in tv_detail.get("tv_info") or []: + if not isinstance(ep, dict): + continue + episode = ep.get("episode") + if not isinstance(episode, int): + continue + season_num = season_map.get(str(ep.get("category_id")), 1) + if season is not None and season_num != season: + continue + season_episodes.setdefault(season_num, []).append(episode) + + for season_num in list(season_episodes.keys()): + season_episodes[season_num] = sorted(set(season_episodes[season_num])) + + return item_id, season_episodes + + def refresh_root_library(self, scan_mode: Optional[Union[str, int]] = None) -> Optional[bool]: + if not self.is_authenticated() or not self._api: + return None + + if not self._libraries: + self.get_librarys() + + scan_type = ( + self.__resolve_scan_type(scan_mode=scan_mode) + if scan_mode is not None + else self._scan_type + ) + results = [] + for lib_id in self._libraries.keys(): + logger.info( + f"刷新媒体库:{self._libraries[lib_id].get('name')}(扫描模式: {scan_type})" + ) + results.append(self.__scan_library(library_id=lib_id, scan_type=scan_type)) + + return all(results) if results else True + + def __match_library_id_by_path(self, path: Optional[Path]) -> Optional[str]: + if path is None: + return None + + path_str = self.__normalize_dir_path(path) + if not self._library_paths: + self.get_librarys() + + for lib_id, lib_path in self._library_paths.items(): + if self.__is_subpath(path_str, lib_path): + return lib_id + return None + + def refresh_library_by_items( + self, + items: List[schemas.RefreshMediaItem], + scan_mode: Optional[Union[str, int]] = None, + ) -> Optional[bool]: + if not self.is_authenticated() or not self._api: + return None + + scan_type = ( + self.__resolve_scan_type(scan_mode=scan_mode) + if scan_mode is not None + else self._scan_type + ) + library_ids = set() + for item in items: + library_id = self.__match_library_id_by_path(item.target_path) + if library_id is None: + return self.refresh_root_library(scan_mode=scan_mode) + library_ids.add(library_id) + + for library_id in library_ids: + lib_name = self._libraries.get(library_id, {}).get("name", library_id) + logger.info(f"刷新媒体库:{lib_name}(扫描模式: {scan_type})") + if not self.__scan_library(library_id=library_id, scan_type=scan_type): + return self.refresh_root_library(scan_mode=scan_mode) + + return True + + def get_webhook_message(self, body: Any) -> Optional[schemas.WebhookEventInfo]: + return None + + def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]: + if not self.is_authenticated() or not self._api or not itemid: + return None + + info = self._api.recently_played_info(itemid) + if not info: + return None + + video_info = info.get("video_info") if isinstance(info.get("video_info"), dict) else None + if not video_info or not video_info.get("ug_video_info_id"): + return None + + return self.__build_media_server_item(video_info, info.get("play_status")) + + def _iter_library_videos(self, root_path: str, page_size: int = 100): + if not self._api or not root_path: + return + + queue: List[str] = [root_path] + visited: set[str] = set() + max_paths = 20000 + + while queue and len(visited) < max_paths: + current_path = queue.pop(0) + if current_path in visited: + continue + visited.add(current_path) + + page = 1 + while True: + data = self._api.poster_wall_get_folder( + path=current_path, + page=page, + page_size=page_size, + sort_type=1, + order_type=1, + ) + if not data: + break + + for video in data.get("video_arr") or []: + if isinstance(video, dict): + yield video + + for folder in data.get("folder_arr") or []: + if not isinstance(folder, dict): + continue + sub_path = folder.get("path") + if sub_path and sub_path not in visited: + queue.append(str(sub_path)) + + if data.get("is_last_page") is True: + break + page += 1 + + def get_items( + self, + parent: Union[str, int], + start_index: Optional[int] = 0, + limit: Optional[int] = -1, + ) -> Generator[schemas.MediaServerItem | None | Any, Any, None]: + if not self.is_authenticated() or not self._api: + return None + + library_id = str(parent) + if not self._library_paths: + self.get_librarys() + + root_path = self._library_paths.get(library_id) + if not root_path: + return None + + skip = max(0, start_index or 0) + remain = -1 if limit in [None, -1] else max(0, limit) + + for video in self._iter_library_videos(root_path=root_path): + video_type = video.get("type") + if video_type not in [1, 2]: + continue + + if skip > 0: + skip -= 1 + continue + + item = self.__build_media_server_item(video) + if item: + yield item + if remain != -1: + remain -= 1 + if remain <= 0: + break + + return None + + def get_play_url(self, item_id: str) -> Optional[str]: + if not self.is_authenticated() or not self._api: + return None + + info = self._api.recently_played_info(item_id) + if not info: + return None + + video_info = info.get("video_info") if isinstance(info.get("video_info"), dict) else None + if not video_info: + return None + + return self.__build_play_url( + item_id=item_id, + video_type=video_info.get("type"), + media_lib_set_id=video_info.get("media_lib_set_id"), + ) + + def get_resume(self, num: Optional[int] = 12) -> Optional[List[schemas.MediaServerPlayItem]]: + if not self.is_authenticated() or not self._api: + return None + + page_size = max(1, num or 12) + data = self._api.recently_played(page=1, page_size=page_size) + if not data: + return [] + + ret_resume = [] + for item in data.get("video_arr") or []: + if len(ret_resume) == page_size: + break + if not isinstance(item, dict): + continue + video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {} + library_id = str(video_info.get("media_lib_set_id") or "") + if self.__is_library_blocked(library_id): + continue + play_item = self.__build_play_item_from_wrapper(item) + if play_item: + ret_resume.append(play_item) + + return ret_resume + + def get_latest(self, num: int = 20) -> Optional[List[schemas.MediaServerPlayItem]]: + if not self.is_authenticated() or not self._api: + return None + + page_size = max(1, num) + data = self._api.recently_updated(page=1, page_size=page_size) + if not data: + return [] + + latest = [] + for item in data.get("video_arr") or []: + if len(latest) == page_size: + break + if not isinstance(item, dict): + continue + video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {} + library_id = str(video_info.get("media_lib_set_id") or "") + if self.__is_library_blocked(library_id): + continue + play_item = self.__build_play_item_from_wrapper(item) + if play_item: + latest.append(play_item) + + return latest + + def get_latest_backdrops(self, num: int = 20, remote: bool = False) -> Optional[List[str]]: + if not self.is_authenticated() or not self._api: + return None + + data = self._api.recently_updated(page=1, page_size=max(1, num)) + if not data: + return [] + + images: List[str] = [] + for item in data.get("video_arr") or []: + if len(images) == num: + break + if not isinstance(item, dict): + continue + + video_info = item.get("video_info") if isinstance(item.get("video_info"), dict) else {} + library_id = str(video_info.get("media_lib_set_id") or "") + if self.__is_library_blocked(library_id): + continue + + image = self.__resolve_image(video_info.get("backdrop_path")) or self.__resolve_image( + video_info.get("poster_path") + ) + if image: + images.append(image) + + return images + + def get_image_cookies(self, image_url: str): + # 绿联图片流接口依赖加密鉴权头,当前图片代理仅支持Cookie注入。 + return None diff --git a/app/schemas/mediaserver.py b/app/schemas/mediaserver.py index daf88112..3c8dccae 100644 --- a/app/schemas/mediaserver.py +++ b/app/schemas/mediaserver.py @@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel): type: Optional[MediaType] = None # 季 seasons: Optional[Dict[int, list]] = Field(default_factory=dict) - # 媒体服务器类型:plex、jellyfin、emby、trimemedia + # 媒体服务器类型:plex、jellyfin、emby、trimemedia、ugreen server_type: Optional[str] = None # 媒体服务器名称 server: Optional[str] = None diff --git a/app/schemas/system.py b/app/schemas/system.py index 6097d441..c47e8554 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -29,7 +29,7 @@ class MediaServerConf(BaseModel): # 名称 name: Optional[str] = None - # 类型 emby/jellyfin/plex + # 类型 emby/jellyfin/plex/trimemedia/ugreen type: Optional[str] = None # 配置 config: Optional[dict] = Field(default_factory=dict) diff --git a/app/schemas/types.py b/app/schemas/types.py index c9073b0f..15076470 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -219,6 +219,8 @@ class SystemConfigKey(Enum): PluginInstallReport = "PluginInstallReport" # 配置向导状态 SetupWizardState = "SetupWizardState" + # 绿联影视登录会话缓存 + UgreenSessionCache = "UgreenSessionCache" # 处理进度Key字典 @@ -309,6 +311,8 @@ class MediaServerType(Enum): Plex = "Plex" # 飞牛影视 TrimeMedia = "TrimeMedia" + # 绿联影视 + Ugreen = "Ugreen" # 识别器类型 diff --git a/app/utils/ugreen_crypto.py b/app/utils/ugreen_crypto.py index df0c0c4b..120d2506 100644 --- a/app/utils/ugreen_crypto.py +++ b/app/utils/ugreen_crypto.py @@ -184,6 +184,14 @@ class UgreenCrypto: encrypt_token: bool = True, encrypt_body: bool = True, ) -> UgreenEncryptedRequest: + """ + 构建绿联加密请求。 + + 关键点: + - 传入的是明文 `params`; + - 方法内部会将其序列化并加密成 `encrypt_query`; + - 业务侧不需要、也不应该手工拼接 `encrypt_query`。 + """ parsed = urlsplit(url) clean_url = urlunsplit( (parsed.scheme, parsed.netloc, parsed.path, "", parsed.fragment) @@ -214,6 +222,7 @@ class UgreenCrypto: return UgreenEncryptedRequest( url=clean_url, headers=headers, + # 绿联接口约定:查询参数统一透传为 encrypt_query params={"encrypt_query": encrypted_query}, json=req_json, aes_key=aes_key, @@ -231,4 +240,3 @@ class UgreenCrypto: return json.loads(plain) except json.JSONDecodeError: return plain - diff --git a/tests/manual/ugreen_media_cli.py b/tests/manual/ugreen_media_cli.py new file mode 100644 index 00000000..973a6589 --- /dev/null +++ b/tests/manual/ugreen_media_cli.py @@ -0,0 +1,299 @@ +from __future__ import annotations + +import argparse +import base64 +import getpass +import json +import os +import sys +import uuid +from typing import Any, Mapping +from urllib.parse import urlsplit, urlunsplit + +# 兼容直接运行脚本:避免 app/utils 被放在 sys.path 首位导致标准库模块被同名文件遮蔽 +if __name__ == "__main__" and __package__ is None: + script_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.abspath(os.path.join(script_dir, "..", "..")) + if script_dir in sys.path: + sys.path.remove(script_dir) + if project_root not in sys.path: + sys.path.insert(0, project_root) + +import requests + +from app.utils.ugreen_crypto import UgreenCrypto + + +class UgreenLoginError(Exception): + pass + + +def _normalize_base_url(raw: str) -> str: + value = (raw or "").strip() + if not value: + raise UgreenLoginError("服务器地址不能为空") + if not value.startswith(("http://", "https://")): + value = f"http://{value}" + parsed = urlsplit(value) + if not parsed.netloc: + raise UgreenLoginError(f"无效服务器地址: {raw}") + return urlunsplit((parsed.scheme, parsed.netloc, "", "", "")).rstrip("/") + + +def _json_or_raise(resp: requests.Response, stage: str) -> dict[str, Any]: + try: + data = resp.json() + except Exception as exc: # pragma: no cover - 网络异常路径 + raise UgreenLoginError( + f"{stage} 返回非 JSON,HTTP {resp.status_code},响应片段: {resp.text[:200]}" + ) from exc + if not isinstance(data, dict): + raise UgreenLoginError(f"{stage} 返回格式异常: {type(data).__name__}") + return data + + +def _decode_public_key(raw: str) -> str: + value = (raw or "").strip() + if not value: + raise UgreenLoginError("未获取到公钥") + if "BEGIN" in value: + return value + try: + return base64.b64decode(value).decode("utf-8") + except Exception as exc: + raise UgreenLoginError("公钥解码失败") from exc + + +def _raise_if_failed(payload: Mapping[str, Any], stage: str) -> None: + if payload.get("code") == 200: + return + raise UgreenLoginError( + f"{stage}失败: code={payload.get('code')} msg={payload.get('msg')}" + ) + + +def _build_common_headers( + client_id: str, client_version: str, language: str +) -> dict[str, str]: + return { + "Accept": "application/json, text/plain, */*", + "Client-Id": client_id, + "Client-Version": client_version, + "UG-Agent": "PC/WEB", + "X-Specify-Language": language, + } + + +def _login_and_get_access( + session: requests.Session, + base_url: str, + username: str, + password: str, + keepalive: bool, + headers: Mapping[str, str], + timeout: float, + verify_ssl: bool, +) -> tuple[str, str]: + check_resp = session.post( + f"{base_url}/ugreen/v1/verify/check", + json={"username": username}, + headers=dict(headers), + timeout=timeout, + verify=verify_ssl, + ) + check_json = _json_or_raise(check_resp, "获取登录公钥") + _raise_if_failed(check_json, "获取登录公钥") + + rsa_token = ( + check_resp.headers.get("x-rsa-token") + or check_resp.headers.get("X-Rsa-Token") + or check_json.get("xRsaToken") + or check_json.get("x-rsa-token") + ) + if not rsa_token: + data = check_json.get("data") + if isinstance(data, Mapping): + rsa_token = data.get("xRsaToken") or data.get("x-rsa-token") + if not rsa_token: + raise UgreenLoginError("登录公钥为空(x-rsa-token)") + + login_public_key = _decode_public_key(str(rsa_token)) + encrypted_password = UgreenCrypto(public_key=login_public_key).rsa_encrypt_long( + password + ) + + login_payload = { + "username": username, + "password": encrypted_password, + "keepalive": keepalive, + "otp": True, + "is_simple": True, + } + login_resp = session.post( + f"{base_url}/ugreen/v1/verify/login", + json=login_payload, + headers=dict(headers), + timeout=timeout, + verify=verify_ssl, + ) + login_json = _json_or_raise(login_resp, "登录") + _raise_if_failed(login_json, "登录") + + data = login_json.get("data") + if not isinstance(data, Mapping): + raise UgreenLoginError("登录成功但响应 data 为空") + + token = str(data.get("token") or "").strip() + public_key = str(data.get("public_key") or "").strip() + if not token: + raise UgreenLoginError("登录成功但未拿到 token") + if not public_key: + raise UgreenLoginError("登录成功但未拿到 public_key") + return token, _decode_public_key(public_key) + + +def _fetch_media_lib( + session: requests.Session, + base_url: str, + token: str, + public_key: str, + client_id: str, + client_version: str, + language: str, + page: int, + page_size: int, + timeout: float, + verify_ssl: bool, +) -> Any: + crypto = UgreenCrypto( + public_key=public_key, + token=token, + client_id=client_id, + client_version=client_version, + ug_agent="PC/WEB", + language=language, + ) + req = crypto.build_encrypted_request( + url=f"{base_url}/ugreen/v1/video/homepage/media_list", + method="GET", + params={"page": page, "page_size": page_size}, + ) + media_resp = session.get( + req.url, + headers=req.headers, + params=req.params, + timeout=timeout, + verify=verify_ssl, + ) + media_json = _json_or_raise(media_resp, "获取媒体库") + return crypto.decrypt_response(media_json, req.aes_key) + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="登录绿联 NAS 并调用媒体库接口(自动处理请求加密/响应解密)" + ) + parser.add_argument("--host", help="服务器地址,例如: http://192.168.20.101:9999") + parser.add_argument("--username", help="用户名") + parser.add_argument("--password", help="密码(不传则交互输入)") + parser.add_argument("--client-id", help="可选,默认自动生成 UUID-WEB") + parser.add_argument("--client-version", default="76363", help="默认: 76363") + parser.add_argument("--language", default="zh-CN", help="默认: zh-CN") + parser.add_argument("--page", type=int, default=1, help="默认: 1") + parser.add_argument("--page-size", type=int, default=50, help="默认: 50") + parser.add_argument("--timeout", type=float, default=20.0, help="默认: 20 秒") + parser.add_argument("--insecure", action="store_true", help="忽略 HTTPS 证书校验") + parser.add_argument( + "--no-keepalive", + action="store_true", + help="关闭保持登录(默认保持登录)", + ) + parser.add_argument("--pretty", action="store_true", help="美化输出 JSON") + parser.add_argument("--output", help="将解密后的结果写入文件") + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv or sys.argv[1:]) + + host = args.host or input("服务器地址: ").strip() + username = args.username or input("用户名: ").strip() + password = args.password or getpass.getpass("密码: ") + client_id = (args.client_id or f"{uuid.uuid4()}-WEB").strip() + keepalive = not args.no_keepalive + verify_ssl = not args.insecure + + try: + base_url = _normalize_base_url(host) + if args.insecure: + requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined] + + session = requests.Session() + headers = _build_common_headers( + client_id=client_id, + client_version=args.client_version, + language=args.language, + ) + + token, public_key = _login_and_get_access( + session=session, + base_url=base_url, + username=username, + password=password, + keepalive=keepalive, + headers=headers, + timeout=args.timeout, + verify_ssl=verify_ssl, + ) + decoded = _fetch_media_lib( + session=session, + base_url=base_url, + token=token, + public_key=public_key, + client_id=client_id, + client_version=args.client_version, + language=args.language, + page=args.page, + page_size=args.page_size, + timeout=args.timeout, + verify_ssl=verify_ssl, + ) + + if isinstance(decoded, Mapping): + if decoded.get("code") != 200: + raise UgreenLoginError( + f"媒体库接口失败: code={decoded.get('code')} msg={decoded.get('msg')}" + ) + media_count = None + data = decoded.get("data") + if isinstance(data, Mapping) and isinstance(data.get("media_lib_info_list"), list): + media_count = len(data["media_lib_info_list"]) + print( + f"调用成功: code={decoded.get('code')} msg={decoded.get('msg')} " + f"media_lib_info_list={media_count}" + ) + + text = json.dumps( + decoded, + ensure_ascii=False, + indent=2 if args.pretty else None, + separators=(",", ":") if not args.pretty else None, + ) + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(text) + f.write("\n") + print(f"解密结果已写入: {args.output}") + else: + print(text) + return 0 + except UgreenLoginError as exc: + print(f"错误: {exc}", file=sys.stderr) + return 1 + except requests.RequestException as exc: + print(f"网络错误: {exc}", file=sys.stderr) + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_ugreen_mediaserver.py b/tests/test_ugreen_mediaserver.py new file mode 100644 index 00000000..d0631a77 --- /dev/null +++ b/tests/test_ugreen_mediaserver.py @@ -0,0 +1,176 @@ +import unittest +from unittest.mock import patch +import importlib.util +import sys +import types +from pathlib import Path + +from app import schemas + +try: + from app.api.endpoints import dashboard as dashboard_endpoint +except Exception: + dashboard_endpoint = None + + +def _load_ugreen_class(): + """ + 在测试中动态加载 Ugreen,避免受可选依赖(如 pyquery/sqlalchemy)影响。 + """ + module_name = "_test_ugreen_module" + if module_name in sys.modules: + return sys.modules[module_name].Ugreen + + # 轻量日志桩 + if "app.log" not in sys.modules: + log_module = types.ModuleType("app.log") + + class _Logger: + def info(self, *_args, **_kwargs): + pass + + def warning(self, *_args, **_kwargs): + pass + + def error(self, *_args, **_kwargs): + pass + + def debug(self, *_args, **_kwargs): + pass + + log_module.logger = _Logger() + sys.modules["app.log"] = log_module + + # SystemConfigOper 桩 + if "app.db.systemconfig_oper" not in sys.modules: + db_module = types.ModuleType("app.db.systemconfig_oper") + + class _SystemConfigOper: + @staticmethod + def get(_key): + return {} + + @staticmethod + def set(_key, _value): + return None + + db_module.SystemConfigOper = _SystemConfigOper + sys.modules["app.db.systemconfig_oper"] = db_module + + # app.modules / app.modules.ugreen / app.modules.ugreen.api 桩 + if "app.modules" not in sys.modules: + pkg = types.ModuleType("app.modules") + pkg.__path__ = [] + sys.modules["app.modules"] = pkg + if "app.modules.ugreen" not in sys.modules: + subpkg = types.ModuleType("app.modules.ugreen") + subpkg.__path__ = [] + sys.modules["app.modules.ugreen"] = subpkg + if "app.modules.ugreen.api" not in sys.modules: + api_module = types.ModuleType("app.modules.ugreen.api") + + class _Api: + host = "" + token = None + + api_module.Api = _Api + sys.modules["app.modules.ugreen.api"] = api_module + + ugreen_path = Path(__file__).resolve().parents[1] / "app" / "modules" / "ugreen" / "ugreen.py" + spec = importlib.util.spec_from_file_location(module_name, ugreen_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + assert spec and spec.loader + spec.loader.exec_module(module) + return module.Ugreen + + +Ugreen = _load_ugreen_class() + + +class _FakeUgreenApi: + host = "http://127.0.0.1:9999" + token = "test-token" + + @staticmethod + def video_all(classification: int, page: int = 1, page_size: int = 1): + if classification == -102: + return {"total_num": 12} + if classification == -103: + return {"total_num": 34} + return {"total_num": 0} + + +class UgreenScanModeTest(unittest.TestCase): + def test_resolve_scan_type(self): + resolve = Ugreen._Ugreen__resolve_scan_type + + self.assertEqual(resolve(scan_mode="new_and_modified"), 1) + self.assertEqual(resolve(scan_mode="supplement_missing"), 2) + self.assertEqual(resolve(scan_mode="full_override"), 3) + + self.assertEqual(resolve(scan_mode="1"), 1) + self.assertEqual(resolve(scan_mode="2"), 2) + self.assertEqual(resolve(scan_mode="3"), 3) + + self.assertEqual(resolve(scan_type=1), 1) + self.assertEqual(resolve(scan_type=2), 2) + self.assertEqual(resolve(scan_type=3), 3) + + self.assertEqual(resolve(scan_mode="unknown"), 2) + self.assertEqual(resolve(), 2) + + +class UgreenStatisticTest(unittest.TestCase): + def test_get_medias_count_episode_is_none(self): + ugreen = Ugreen.__new__(Ugreen) + ugreen._host = "http://127.0.0.1:9999" + ugreen._username = "tester" + ugreen._password = "secret" + ugreen._userinfo = {"name": "tester"} + ugreen._api = _FakeUgreenApi() + + stat = ugreen.get_medias_count() + self.assertEqual(stat.movie_count, 12) + self.assertEqual(stat.tv_count, 34) + self.assertIsNone(stat.episode_count) + + +class DashboardStatisticTest(unittest.TestCase): + @unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing") + def test_statistic_all_episode_missing(self): + mocked_stats = [ + schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2), + schemas.Statistic(movie_count=1, tv_count=2, episode_count=None, user_count=1), + ] + with patch( + "app.api.endpoints.dashboard.DashboardChain.media_statistic", + return_value=mocked_stats, + ): + ret = dashboard_endpoint.statistic(name="ugreen", _=None) + + self.assertEqual(ret.movie_count, 11) + self.assertEqual(ret.tv_count, 22) + self.assertEqual(ret.user_count, 3) + self.assertIsNone(ret.episode_count) + + @unittest.skipIf(dashboard_endpoint is None, "dashboard endpoint dependencies are missing") + def test_statistic_mixed_episode_count(self): + mocked_stats = [ + schemas.Statistic(movie_count=10, tv_count=20, episode_count=None, user_count=2), + schemas.Statistic(movie_count=1, tv_count=2, episode_count=6, user_count=1), + ] + with patch( + "app.api.endpoints.dashboard.DashboardChain.media_statistic", + return_value=mocked_stats, + ): + ret = dashboard_endpoint.statistic(name="all", _=None) + + self.assertEqual(ret.movie_count, 11) + self.assertEqual(ret.tv_count, 22) + self.assertEqual(ret.user_count, 3) + self.assertEqual(ret.episode_count, 6) + + +if __name__ == "__main__": + unittest.main()