diff --git a/app/modules/trimemedia/__init__.py b/app/modules/trimemedia/__init__.py new file mode 100644 index 00000000..e86434e7 --- /dev/null +++ b/app/modules/trimemedia/__init__.py @@ -0,0 +1,365 @@ +import re +from typing import Any, Generator, List, Optional, Tuple, Union + +from app import schemas +from app.core.context import MediaInfo +from app.core.event import eventmanager +from app.log import logger +from app.modules import _MediaServerBase, _ModuleBase +from app.modules.trimemedia.trimemedia import TrimeMedia +from app.schemas import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType + + +class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]): + + def init_module(self) -> None: + """ + 初始化模块 + """ + super().init_service( + service_name=TrimeMedia.__name__.lower(), + service_type=lambda conf: TrimeMedia( + **conf.config, sync_libraries=conf.sync_libraries + ), + ) + + @staticmethod + def get_name() -> str: + return "飞牛影视" + + @staticmethod + def get_type() -> ModuleType: + """ + 获取模块类型 + """ + return ModuleType.MediaServer + + @staticmethod + def get_subtype() -> MediaServerType: + """ + 获取模块子类型 + """ + return MediaServerType.TrimeMedia + + @staticmethod + def get_priority() -> int: + """ + 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 + """ + return 4 + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def scheduler_job(self) -> None: + """ + 定时任务,每10分钟调用一次 + """ + # 定时重连 + for name, server in self.get_instances().items(): + if server.is_configured() and server.is_inactive(): + logger.info(f"飞牛影视 {name} 连接断开,尝试重连 ...") + server.reconnect() + + def stop(self): + pass + + def test(self) -> Optional[Tuple[bool, str]]: + """ + 测试模块连接性 + """ + if not self.get_instances(): + return None + for name, server in self.get_instances().items(): + if not server.is_configured(): + return False, f"飞牛影视配置不完整:{name}" + if server.is_inactive() and server.reconnect() != True: + return False, f"无法连接飞牛影视:{name}" + return True, "" + + def user_authenticate( + self, credentials: AuthCredentials, service_name: Optional[str] = None + ) -> Optional[AuthCredentials]: + """ + 使用飞牛影视用户辅助完成用户认证 + + :param credentials: 认证数据 + :param service_name: 指定要认证的媒体服务器名称,若为 None 则认证所有服务 + :return: 认证数据 + """ + # 飞牛影视认证 + if not credentials or credentials.grant_type != "password": + return None + # 确定要认证的服务器列表 + if service_name: + # 如果指定了服务名,获取该服务实例 + servers = ( + [(service_name, server)] + if (server := self.get_instance(service_name)) + else [] + ) + else: + # 如果没有指定服务名,遍历所有服务 + servers = self.get_instances().items() + # 遍历要认证的服务器 + for name, server in servers: + # 触发认证拦截事件 + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials( + username=credentials.username, + channel=self.get_name(), + service=name, + status="triggered", + ), + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue + token = server.authenticate(credentials.username, credentials.password) + if token: + credentials.channel = self.get_name() + credentials.service = name + credentials.token = token + return credentials + return None + + def webhook_parser( + self, body: Any, form: Any, args: Any + ) -> Optional[schemas.WebhookEventInfo]: + """ + 解析Webhook报文体 + + :param body: 请求体 + :param form: 请求表单 + :param args: 请求参数 + :return: 字典,解析为消息时需要包含:title、text、image + """ + source = args.get("source") + if source: + server: TrimeMedia = self.get_instance(source) + if not server: + return None + result = server.get_webhook_message(body) + if result: + result.server_name = source + return result + + for server in self.get_instances().values(): + if server: + result = server.get_webhook_message(body) + if result: + return result + return None + + def media_exists( + self, + mediainfo: MediaInfo, + itemid: Optional[str] = None, + server: Optional[str] = None, + ) -> Optional[schemas.ExistMediaInfo]: + """ + 判断媒体文件是否存在 + + :param mediainfo: 识别的媒体信息 + :param itemid: 媒体服务器ItemID + :param server: 媒体服务器名称 + :return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}} + """ + if server: + servers = [(server, self.get_instance(server))] + else: + servers = self.get_instances().items() + for name, s in servers: + if not server: + continue + if mediainfo.type == MediaType.MOVIE: + if itemid: + movie = s.get_iteminfo(itemid) + if movie: + logger.info(f"媒体库 {name} 中找到了 {movie}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="trimemedia", + server=name, + itemid=movie.item_id, + ) + movies = s.get_movies( + title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id, + ) + if not movies: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + else: + logger.info(f"媒体库 {name} 中找到了 {movies}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="trimemedia", + server=name, + itemid=movies[0].item_id, + ) + else: + itemid, tvs = s.get_tv_episodes( + title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id, + item_id=itemid, + ) + if not tvs: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + else: + logger.info( + f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}" + ) + return schemas.ExistMediaInfo( + type=MediaType.TV, + seasons=tvs, + server_type="trimemedia", + server=name, + itemid=itemid, + ) + return None + + def media_statistic( + self, server: Optional[str] = None + ) -> Optional[List[schemas.Statistic]]: + """ + 媒体数量统计 + """ + if server: + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return None + servers = [server_obj] + else: + servers = self.get_instances().values() + media_statistics = [] + for s in servers: + media_statistic = s.get_medias_count() + if not media_statistic: + continue + media_statistic.user_count = s.get_user_count() + media_statistics.append(media_statistic) + return media_statistics + + def mediaserver_librarys( + self, server: Optional[str] = None, hidden: Optional[bool] = False, **kwargs + ) -> Optional[List[schemas.MediaServerLibrary]]: + """ + 媒体库列表 + """ + server_obj: TrimeMedia = self.get_instance(server) + if server_obj: + return server_obj.get_librarys(hidden=hidden) + return None + + def mediaserver_items( + self, + server: str, + library_id: Union[str, int], + start_index: Optional[int] = 0, + limit: Optional[int] = -1, + ) -> Optional[Generator]: + """ + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 + """ + server_obj: TrimeMedia = self.get_instance(server) + if server_obj: + return server_obj.get_items(library_id, start_index, limit) + return None + + def mediaserver_iteminfo( + self, server: str, item_id: str + ) -> Optional[schemas.MediaServerItem]: + """ + 媒体库项目详情 + """ + server_obj: TrimeMedia = self.get_instance(server) + if server_obj: + return server_obj.get_iteminfo(item_id) + return None + + def mediaserver_tv_episodes( + self, server: str, item_id: Union[str, int] + ) -> Optional[List[schemas.MediaServerSeasonInfo]]: + """ + 获取剧集信息 + """ + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return None + _, seasoninfo = server_obj.get_tv_episodes(item_id=item_id) + if not seasoninfo: + return [] + return [ + schemas.MediaServerSeasonInfo(season=season, episodes=episodes) + for season, episodes in seasoninfo.items() + ] + + def mediaserver_playing( + self, server: str, count: Optional[int] = 20, **kwargs + ) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器正在播放信息 + """ + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_resume(num=count) + + def mediaserver_play_url( + self, server: str, item_id: Union[str, int] + ) -> Optional[str]: + """ + 获取媒体库播放地址 + """ + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return None + return server_obj.get_play_url(item_id) + + def mediaserver_latest( + self, + server: Optional[str] = None, + count: Optional[int] = 20, + **kwargs, + ) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器最新入库条目 + """ + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_latest(num=count) + + def mediaserver_latest_images( + self, + server: Optional[str] = None, + count: Optional[int] = 20, + remote: Optional[bool] = False, + **kwargs, + ) -> List[str]: + """ + 获取媒体服务器最新入库条目的图片 + + :param server: 媒体服务器名称 + :param count: 获取数量 + :param remote: True为外网链接, False为内网链接 + :return: 图片链接列表 + """ + server_obj: TrimeMedia = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_latest_backdrops(num=count, remote=remote) diff --git a/app/modules/trimemedia/api.py b/app/modules/trimemedia/api.py new file mode 100644 index 00000000..3a2c9907 --- /dev/null +++ b/app/modules/trimemedia/api.py @@ -0,0 +1,446 @@ +import hashlib +import json +import random +import time +from dataclasses import dataclass +from enum import Enum +from typing import Optional, Union + +from app.core.config import settings +from app.log import logger +from app.utils.http import RequestUtils, requests + + +@dataclass +class User: + guid: str + username: str + is_admin: int = 0 + + +class Category(Enum): + Movie = "Movie" + TV = "TV" + Mix = "Mix" + Others = "Others" + + @classmethod + def _missing_(self, value): + return self.Others + + +class Type(Enum): + Movie = "Movie" + TV = "TV" + Season = "Season" + Episode = "Episode" + Video = "Video" + Directory = "Directory" + + @classmethod + def _missing_(self, value): + return self.Video + + +@dataclass +class MediaDb: + guid: str + category: Category + name: Optional[str] = None + posters: Optional[list[str]] = None + dir_list: Optional[list[str]] = None + + +@dataclass +class MediaDbSumary: + favorite: int = 0 + movie: int = 0 + tv: int = 0 + video: int = 0 + total: int = 0 + + +@dataclass +class Item: + guid: str + ancestor_guid: str = "" + type: Optional[Type] = None + # 当type为Episode时是剧名,parent_title是季名,title作为分集名称 + tv_title: Optional[str] = None + parent_title: Optional[str] = None + title: Optional[str] = None + original_title: Optional[str] = None + overview: Optional[str] = None + poster: Optional[str] = None + backdrops: Optional[str] = None + posters: Optional[str] = None + douban_id: Optional[int] = None + imdb_id: Optional[str] = None + trim_id: Optional[str] = None + release_date: Optional[str] = None + air_date: Optional[str] = None + vote_average: Optional[str] = None + season_number: Optional[int] = None + episode_number: Optional[int] = None + duration: Optional[int] = None # 片长(秒) + ts: Optional[int] = None # 已播放(秒) + watched: Optional[int] = None # 1:已看完 + + @property + def tmdb_id(self) -> Optional[int]: + if self.trim_id is None: + return None + if self.trim_id.startswith("tt") or self.trim_id.startswith("tm"): + # 飞牛给tmdbid加了前缀用以区分tv或movie + return int(self.trim_id[2:]) + return None + + +class Api: + __slots__ = ( + "_host", + "_token", + "_apikey", + "_api_path", + "_request_utils", + ) + + @property + def token(self) -> Optional[str]: + return self._token + + @property + def host(self) -> str: + return self._host + + @property + def apikey(self) -> str: + return self._apikey + + def __init__(self, host: str, apikey: str): + self._api_path = "/v/api/v1" + self._host = host.rstrip("/") + self._apikey = apikey + self._token = None + self._request_utils = RequestUtils(session=requests.Session()) + + def login(self, username, password) -> Optional[str]: + """ + 登录飞牛影视 + + :return: 成功返回token 否则返回None + """ + if ( + res := self.__request_api( + "/login", + data={ + "username": username, + "password": password, + "app_name": "trimemedia-web", + }, + ) + ) and res.success: + self._token = res.data.get("token") + return self._token + + def logout(self) -> bool: + """ + 退出账号 + """ + if (res := self.__request_api("/user/logout", method="post")) and res.success: + if res.data == True: + self._token = None + return True + return False + + def user_list(self) -> Optional[list[User]]: + """ + 用户列表(仅管理员有权访问) + """ + if (res := self.__request_api("/manager/user/list")) and res.success: + return [ + User( + guid=info.get("guid"), + username=info.get("username"), + is_admin=info.get("is_admin", 0), + ) + for info in res.data + ] + return None + + def user_info(self) -> Optional[User]: + """ + 当前用户信息 + """ + if (res := self.__request_api("/user/info")) and res.success: + user = User("", "") + user.__dict__.update(res.data) + return user + return None + + def mediadb_sum(self) -> Optional[MediaDbSumary]: + """ + 媒体数量统计 + """ + if (res := self.__request_api("/mediadb/sum")) and res.success: + sum = MediaDbSumary() + sum.__dict__.update(res.data) + return sum + return None + + def mediadb_list(self) -> Optional[MediaDbSumary]: + """ + 媒体库列表(普通用户) + """ + if (res := self.__request_api("/mediadb/list")) and res.success: + items = [] + for info in res.data: + mdb = MediaDb( + guid=info.get("guid"), + category=Category(info.get("category")), + name=info.get("title", ""), + posters=[ + self.__build_img_api_url(poster) + for poster in info.get("posters", []) + ], + ) + items.append(mdb) + return items + return None + + def __build_img_api_url(self, img_path: Optional[str]) -> Optional[str]: + if not img_path: + return None + if img_path[0] != "/": + img_path = "/" + img_path + return f"{self._api_path}/sys/img{img_path}" + + def mdb_list(self) -> Optional[list[MediaDb]]: + """ + 媒体库列表(管理员) + """ + if (res := self.__request_api("/mdb/list")) and res.success: + items = [] + for info in res.data: + mdb = MediaDb( + guid=info.get("guid"), + category=Category(info.get("category")), + name=info.get("name", ""), + posters=[ + self.__build_img_api_url(poster) + for poster in info.get("posters", []) + ], + dir_list=info.get("dir_list"), + ) + items.append(mdb) + return items + return None + + def mdb_scanall(self) -> bool: + """ + 扫描所有媒体库 + """ + if (res := self.__request_api("/mdb/scanall", method="post")) and res.success: + if res.data == True: + self._token = None + return True + return False + + def mdb_scan(self, mdb: MediaDb) -> bool: + """ + 扫描指定媒体库 + """ + if ( + res := self.__request_api(f"/mdb/scan/{mdb.guid}", data={}) + ) and res.success: + if res.data == True: + self._token = None + return True + return False + + def __build_item(self, info: dict) -> Item: + """ + 构造媒体Item + """ + item = Item(guid="") + item.__dict__.update(info) + item.type = Type(info.get("type")) + # Item详情接口才有posters和backdrops + item.posters = self.__build_img_api_url(item.posters) + item.backdrops = self.__build_img_api_url(item.backdrops) + item.poster = ( + self.__build_img_api_url(item.poster) if item.poster else item.posters + ) + return item + + def item_list( + self, + guid: Optional[str] = None, + type: Optional[list[Type]] = [Type.Movie, Type.TV, Type.Directory, Type.Video], + exclude_grouped_video=True, + page=1, + page_size=22, + sort_by="create_time", + sort="DESC", + ) -> Optional[list[Item]]: + """ + 媒体列表 + """ + post = { + "tags": {"type": type} if type else {}, + "sort_type": sort, + "sort_column": sort_by, + "page": page, + "page_size": page_size, + } + if guid: + post["ancestor_guid"] = guid + if exclude_grouped_video: + post["exclude_grouped_video"] = 1 + + if (res := self.__request_api("/item/list", data=post)) and res.success: + return [self.__build_item(info) for info in res.data.get("list", [])] + return None + + def search_list(self, keywords: str) -> Optional[list[Item]]: + """ + 搜索影片、演员 + """ + if ( + res := self.__request_api("/search/list", params={"q": keywords}) + ) and res.success: + return [self.__build_item(info) for info in res.data] + return None + + def item(self, guid: str) -> Optional[Item]: + """ """ + if (res := self.__request_api(f"/item/{guid}")) and res.success: + return self.__build_item(res.data) + return None + + def season_list(self, tv_guid: str) -> Optional[list[Item]]: + """ """ + if (res := self.__request_api(f"/season/list/{tv_guid}")) and res.success: + return [self.__build_item(info) for info in res.data] + return None + + def episode_list(self, season_guid: str) -> Optional[list[Item]]: + """ """ + if (res := self.__request_api(f"/episode/list/{season_guid}")) and res.success: + return [self.__build_item(info) for info in res.data] + return None + + def play_list(self) -> Optional[list[Item]]: + """ + 继续观看列表 + """ + if (res := self.__request_api("/play/list")) and res.success: + return [self.__build_item(info) for info in res.data] + return None + + ################################################################ + + def __get_authx(self, api_path, body: Optional[str]): + """ + 计算消息签名 + """ + if api_path[0] != "/": + api_path = "/" + api_path + nonce = str(random.randint(100000, 999999)) + ts = str(int(time.time() * 1000)) + md5 = hashlib.md5() + md5.update((body or "").encode()) + data_hash = md5.hexdigest() + md5 = hashlib.md5() + md5.update( + "_".join( + [ + "NDzZTVxnRKP8Z0jXg1VAMonaG8akvh", + api_path, + nonce, + ts, + data_hash, + self._apikey, + ] + ).encode() + ) + sign = md5.hexdigest() + return f"nonce={nonce}×tamp={ts}&sign={sign}" + + def __request_api( + self, api: str, method: str = None, params: dict = None, data: dict = None + ): + """ + 请求飞牛影视API + """ + + @dataclass + class Result: + @property + def success(self) -> bool: + return code == 0 + + code: int + msg: Optional[str] = None + data: Optional[Union[dict, list, str, bool]] = None + + class JsonEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Type): + return obj.value + return super().default(self, obj) + + if not self._host or not api: + return None + if api[0] != "/": + api = "/" + api + api_path = self._api_path + api + url = self._host + api_path + if method is None: + method = "get" if data is None else "post" + if method == "post": + json_body = ( + json.dumps(data, allow_nan=False, cls=JsonEncoder) if data else "" + ) + else: + json_body = None + headers = { + "User-Agent": settings.USER_AGENT, + "Authorization": self._token, + "authx": self.__get_authx(api_path, json_body), + } + if json_body is not None: + headers["Content-Type"] = "application/json" + try: + res = self._request_utils.request( + method=method, url=url, headers=headers, params=params, data=json_body + ) + if res: + resp = res.json() + msg = resp.get("msg") + if code := int(resp.get("code", -1)): + logger.error(f"请求接口 {api_path} 失败,错误码:{code} {msg}") + return Result(code, msg) + return Result(0, msg, resp.get("data")) + else: + logger.error(f"请求接口 {api_path} 失败") + except Exception as e: + logger.error(f"请求接口 {api_path} 异常:" + str(e)) + return None + + +if __name__ == "__main__": + api = Api("http://192.168.1.49:5666/", "16CCEB3D-AB42-077D-36A1-F355324E4237") + api.login("adad", "123456") + logger.debug(f"token={api.token}") + + user = api.user_info() + logger.debug(user) + + mediadbs = api.mdb_list() + logger.debug(mediadbs) + + items = api.item_list(mediadbs[0].guid, page=1, page_size=0) + logger.debug(items) + + api.logout() diff --git a/app/modules/trimemedia/trimemedia.py b/app/modules/trimemedia/trimemedia.py new file mode 100644 index 00000000..8bbbbc86 --- /dev/null +++ b/app/modules/trimemedia/trimemedia.py @@ -0,0 +1,550 @@ +import json +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Tuple, Union + +import app.modules.trimemedia.api as fnapi +from app import schemas +from app.log import logger +from app.schemas import MediaType +from app.utils.url import UrlUtils + + +class TrimeMedia: + _username: Optional[str] = None + _password: Optional[str] = None + + _userinfo: Optional[fnapi.User] = None + _playhost: Optional[str] = None + + _libraries: dict[str, fnapi.MediaDb] = {} + _sync_libraries: List[str] = [] + + _api: Optional[fnapi.Api] = None + + def __init__( + self, + host: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + play_host: Optional[str] = None, + sync_libraries: list = None, + **kwargs, + ): + if not host or not username or not password: + logger.error("飞牛影视配置不完整!!") + return + host = UrlUtils.standardize_base_url(host).rstrip("/") + if play_host: + self._playhost = UrlUtils.standardize_base_url(play_host).rstrip("/") + self._username = username + self._password = password + self._sync_libraries = sync_libraries or [] + self._api = fnapi.Api(host, apikey="16CCEB3D-AB42-077D-36A1-F355324E4237") + self.reconnect() + + def is_configured(self) -> bool: + return self._api is not None + + def is_authenticated(self) -> bool: + return self.is_configured() and self._api.token is not None + + def is_inactive(self) -> bool: + """ + 判断是否需要重连 + """ + if not self.is_authenticated(): + return True + self._userinfo = self._api.user_info() + return self._userinfo is None + + def reconnect(self): + """ + 重连 + """ + if not self.is_configured(): + return False + if self._api.login(self._username, self._password) is None: + return False + self._userinfo = self._api.user_info() + if self._userinfo is None: + return False + logger.debug(f"{self._userinfo.username} 成功登录飞牛影视") + return True + + def get_librarys( + self, hidden: Optional[bool] = False + ) -> List[schemas.MediaServerLibrary]: + """ + 获取媒体服务器所有媒体库列表 + """ + if not self.is_authenticated(): + return [] + if self._userinfo.is_admin == 1: + mdb_list = self._api.mdb_list() or [] + else: + mdb_list = self._api.mediadb_list() or [] + self._libraries = {lib.guid: lib for lib in mdb_list} + libraries = [] + for library in self._libraries.values(): + if hidden and self.__is_library_blocked(library.guid): + continue + if library.category == fnapi.Category.Movie: + library_type = MediaType.MOVIE.value + elif library.category == fnapi.Category.TV: + library_type = MediaType.TV.value + elif library.category == fnapi.Category.Others: + # 忽略这个库 + continue + else: + library_type = MediaType.UNKNOWN.value + libraries.append( + schemas.MediaServerLibrary( + server="trimemedia", + id=library.guid, + name=library.name, + type=library_type, + path=library.dir_list, + image_list=[ + f"{self._api.host}{img_path}?w=256" + for img_path in library.posters or [] + ], + link=f"{self._playhost or self._api.host}/v/library/{library.guid}", + ) + ) + return libraries + + def get_user_count(self) -> int: + """ + 获取用户数量(非管理员不能调用) + """ + if not self.is_authenticated(): + return 0 + if not self._userinfo or self._userinfo.is_admin != 1: + return 0 + return len(self._api.user_list() or []) + + def get_medias_count(self) -> schemas.Statistic: + """ + 获取媒体数量 + + :return: MovieCount SeriesCount + """ + if not self.is_authenticated(): + return schemas.Statistic() + if (info := self._api.mediadb_sum()) is None: + return schemas.Statistic() + return schemas.Statistic( + movie_count=info.movie, + tv_count=info.tv, + ) + + def authenticate(self, username: str, password: str) -> Optional[str]: + """ + 用户认证 + + :param username: 用户名 + :param password: 密码 + :return: 认证成功返回token,否则返回None + """ + if not username or not password: + return None + if not self.is_configured(): + return None + feiniu = fnapi.Api(self._api.host, self._api.apikey) + if token := feiniu.login(username, password): + feiniu.logout() + return token + + def get_movies( + self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None + ) -> Optional[List[schemas.MediaServerItem]]: + """ + 根据标题和年份,检查电影是否在飞牛中存在,存在则返回列表 + + :param title: 标题 + :param year: 年份,为空则不过滤 + :param tmdb_id: TMDB ID + :return: 含title、year属性的字典列表 + """ + if not self.is_authenticated(): + return None + movies = [] + items = self._api.search_list(keywords=title) or [] + for item in items: + if item.type != fnapi.Type.Movie: + continue + if ( + (not tmdb_id or tmdb_id == item.tmdb_id) + and title in [item.title, item.original_title] + and (not year or (item.release_date and item.release_date[:4] == year)) + ): + movies.append(self.__build_media_server_item(item)) + return movies + + def __get_series_id_by_name(self, name: str, year: str) -> Optional[str]: + items = self._api.search_list(keywords=name) or [] + for item in items: + if item.type != fnapi.Type.TV: + continue + # 可惜搜索接口不下发original_title 也不能指定分类、年份 + if name in [item.title, item.original_title]: + if not year or (item.air_date and item.air_date[:4] == year): + return item.guid + return None + + def get_tv_episodes( + self, + item_id: Optional[str] = None, + title: Optional[str] = None, + year: Optional[str] = None, + tmdb_id: Optional[int] = None, + season: Optional[int] = None, + ) -> Tuple[Optional[str], Optional[Dict[int, list]]]: + """ + 根据标题和年份和季,返回飞牛中的剧集列表 + + :param item_id: 飞牛影视中的guid + :param title: 标题 + :param year: 年份 + :param tmdb_id: TMDBID + :param season: 季 + :return: 集号的列表 + """ + if not self.is_authenticated(): + return None, None + + if not item_id: + item_id = self.__get_series_id_by_name(title, year) + if item_id is None: + return None, None + + item_info = self.get_iteminfo(item_id) + if not item_info: + return None, {} + + if tmdb_id and item_info.tmdbid: + if tmdb_id != item_info.tmdbid: + return None, {} + + seasons = self._api.season_list(item_id) + if not seasons: + # 季列表获取失败 + return None, {} + + if season is not None: + for item in seasons: + if item.season_number == season: + seasons = [item] + break + else: + # 没有匹配的季 + return None, {} + + season_episodes = {} + for item in seasons: + episodes = self._api.episode_list(item.guid) + for episode in episodes or []: + if episode.season_number not in season_episodes: + season_episodes[episode.season_number] = [] + season_episodes[episode.season_number].append(episode.episode_number) + return item_id, season_episodes + + def refresh_root_library(self) -> Optional[bool]: + """ + 通知飞牛刷新整个媒体库(非管理员不能调用) + """ + if not self.is_authenticated(): + return None + if not self._userinfo or self._userinfo.is_admin != 1: + logger.error("飞牛仅支持管理员账号刷新媒体库") + return False + + logger.info("刷新所有媒体库") + return self._api.mdb_scanall() + + def refresh_library_by_items( + self, items: List[schemas.RefreshMediaItem] + ) -> Optional[bool]: + """ + 按路径刷新所在的媒体库(非管理员不能调用) + + :param items: 已识别的需要刷新媒体库的媒体信息列表 + """ + if not self.is_authenticated(): + return None + if not self._userinfo or self._userinfo.is_admin != 1: + logger.error("飞牛仅支持管理员账号刷新媒体库") + return False + + libraries = set() + for item in items: + lib = self.__match_library_by_path(item.target_path) + if lib is None: + # 如果有匹配失败的,刷新整个库 + return self._api.mdb_scanall() + # 媒体库去重 + libraries.add(lib.guid) + + for lib_guid in libraries: + # 逐个刷新 + lib = self._libraries[lib_guid] + logger.info(f"刷新媒体库:{lib.name}") + if not self._api.mdb_scan(lib): + # 如果失败,刷新整个库 + return self._api.mdb_scanall() + return True + + def __match_library_by_path(self, path: Path) -> Optional[fnapi.MediaDb]: + def is_subpath(_path: Path, _parent: Path) -> bool: + """ + 判断_path是否是_parent的子目录下 + """ + _path = _path.resolve() + _parent = _parent.resolve() + return _path.parts[: len(_parent.parts)] == _parent.parts + + if path is None: + return None + for lib in self._libraries.values(): + for dir in lib.dir_list or []: + if is_subpath(path, Path(dir)): + return lib + return None + + # TODO 飞牛似乎还没有这个功能 + def get_webhook_message(self, body: any) -> Optional[schemas.WebhookEventInfo]: + return None + + def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]: + """ + 获取单个项目详情 + """ + if not self.is_authenticated(): + return None + if item := self._api.item(guid=itemid): + return self.__build_media_server_item(item) + return None + + @staticmethod + def __build_media_server_item(item: fnapi.Item): + if item.air_date and item.type == fnapi.Type.TV: + year = item.air_date[:4] + elif item.release_date: + year = item.release_date[:4] + else: + year = None + + user_state = schemas.MediaServerItemUserState() + if item.watched: + user_state.played = True + if item.duration and item.ts is not None: + user_state.percentage = item.ts / item.duration + user_state.resume = True + if item.type is None: + item_type = None + else: + # 将飞牛的媒体类型转为MP能识别的 + item_type = "Series" if item.type == fnapi.Type.TV else item.type.value + return schemas.MediaServerItem( + server="trimemedia", + library=item.ancestor_guid, + item_id=item.guid, + item_type=item_type, + title=item.title, + original_title=item.original_title, + year=year, + tmdbid=item.tmdb_id, + imdbid=item.imdb_id, + user_state=user_state, + ) + + @staticmethod + def __build_play_url(host: str, item: fnapi.Item) -> str: + """ + 拼装播放链接 + """ + if item.type == fnapi.Type.Episode: + return f"{host}/v/tv/episode/{item.guid}" + elif item.type == fnapi.Type.Season: + return f"{host}/v/tv/season/{item.guid}" + elif item.type == fnapi.Type.Movie: + return f"{host}/v/movie/{item.guid}" + elif item.type == fnapi.Type.TV: + return f"{host}/v/tv/{item.guid}" + else: + # 其它类型走通用页面,由飞牛来判断 + return f"{host}/v/other/{item.guid}" + + def __build_media_server_play_item( + self, item: fnapi.Item + ) -> schemas.MediaServerPlayItem: + """ + :params use_backdrop: 是否优先使用Backdrop类型的图片 + """ + if item.type == fnapi.Type.Episode: + title = item.tv_title + subtitle = f"S{item.season_number}:{item.episode_number} - {item.title}" + else: + title = item.title + subtitle = "电影" if item.type == fnapi.Type.Movie else "视频" + type = ( + MediaType.MOVIE.value + if item.type in [fnapi.Type.Movie, fnapi.Type.Video] + else MediaType.TV.value + ) + return schemas.MediaServerPlayItem( + id=item.guid, + title=title, + subtitle=subtitle, + type=type, + image=f"{self._api.host}{item.poster}", + link=self.__build_play_url(self._playhost or self._api.host, item), + percent=( + item.ts / item.duration * 100.0 + if item.duration and item.ts is not None + else 0 + ), + ) + + def get_items( + self, + parent: Union[str, int], + start_index: Optional[int] = 0, + limit: Optional[int] = -1, + ) -> Generator[schemas.MediaServerItem | None | Any, Any, None]: + """ + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 + """ + if not self.is_authenticated(): + return None + if (SIZE := limit) is None: + SIZE = -1 + items = ( + self._api.item_list( + guid=parent, + page=start_index + 1, + page_size=SIZE, + type=[fnapi.Type.Movie, fnapi.Type.TV, fnapi.Type.Directory], + ) + or [] + ) + for item in items: + if item.type == fnapi.Type.Directory: + for items in self.get_items(parent=item.guid): + yield items + elif item.type in [fnapi.Type.Movie, fnapi.Type.TV]: + yield self.__build_media_server_item(item) + return None + + def get_play_url(self, item_id: str) -> str: + """ + 获取媒体的外网播放链接 + + :param item_id: 媒体ID + """ + if not self.is_authenticated(): + return None + if (item := self._api.item(item_id)) is None: + return None + # 根据查询到的信息拼装出播放链接 + return self.__build_play_url(self._playhost or self._api.host, item) + + def get_resume( + self, num: Optional[int] = 12 + ) -> Optional[List[schemas.MediaServerPlayItem]]: + """ + 获取继续观看列表 + + :param num: 列表大小,None不限制数量 + """ + if not self.is_authenticated(): + return None + ret_resume = [] + for item in self._api.play_list() or []: + if len(ret_resume) == num: + break + if self.__is_library_blocked(item.ancestor_guid): + continue + ret_resume.append(self.__build_media_server_play_item(item)) + return ret_resume + + def get_latest(self, num=20) -> Optional[List[schemas.MediaServerPlayItem]]: + """ + 获取最近更新列表 + """ + if not self.is_authenticated(): + return None + items = ( + self._api.item_list( + page=1, + page_size=max(100, num * 5), + type=[fnapi.Type.Movie, fnapi.Type.TV], + ) + or [] + ) + latest = [] + for item in items: + if len(latest) == num: + break + if self.__is_library_blocked(item.ancestor_guid): + continue + latest.append(self.__build_media_server_play_item(item)) + return latest + + def get_latest_backdrops(self, num=20, remote=False) -> Optional[List[str]]: + """ + 获取最近更新的媒体Backdrop图片 + """ + if not self.is_authenticated(): + return None + items = ( + self._api.item_list( + page=1, + page_size=max(100, num * 5), + type=[fnapi.Type.Movie, fnapi.Type.TV], + ) + or [] + ) + backdrops = [] + for item in items: + if len(backdrops) == num: + break + if self.__is_library_blocked(item.ancestor_guid): + continue + if (item_details := self._api.item(item.guid)) is None: + continue + if remote: + img_host = self._playhost or self._api.host + else: + img_host = self._api.host + if item_details.backdrops: + item_image = item_details.backdrops + else: + item_image = ( + item_details.posters + if item_details.posters + else item_details.poster + ) + backdrops.append(f"{img_host}{item_image}") + return backdrops + + def __is_library_blocked(self, library_guid: str): + if library := self._libraries.get(library_guid): + if library.category == fnapi.Category.Others: + # 忽略这个库 + return True + return ( + True + if ( + self._sync_libraries + and "all" not in self._sync_libraries + and library_guid not in self._sync_libraries + ) + else False + ) diff --git a/app/schemas/mediaserver.py b/app/schemas/mediaserver.py index 30d1ffaf..1a47c875 100644 --- a/app/schemas/mediaserver.py +++ b/app/schemas/mediaserver.py @@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel): type: Optional[MediaType] # 季 seasons: Optional[Dict[int, list]] = Field(default_factory=dict) - # 媒体服务器类型:plex、jellyfin、emby + # 媒体服务器类型:plex、jellyfin、emby、trimemedia server_type: Optional[str] = None # 媒体服务器名称 server: Optional[str] = None diff --git a/app/schemas/types.py b/app/schemas/types.py index 7e4e5805..3355f8e9 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -219,6 +219,8 @@ class MediaServerType(Enum): Jellyfin = "Jellyfin" # Plex Plex = "Plex" + # 飞牛影视 + TrimeMedia = "TrimeMedia" # 识别器类型