初步支持飞牛影视

This commit is contained in:
景大侠
2025-03-28 15:49:47 +08:00
parent b8cd1c46c1
commit 896eb13f7d
5 changed files with 1364 additions and 1 deletions

View File

@@ -0,0 +1,365 @@
import re
from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.trimemedia.trimemedia import TrimeMedia
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType
class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]):
def init_module(self) -> None:
"""
初始化模块
"""
super().init_service(
service_name=TrimeMedia.__name__.lower(),
service_type=lambda conf: TrimeMedia(
**conf.config, sync_libraries=conf.sync_libraries
),
)
@staticmethod
def get_name() -> str:
return "飞牛影视"
@staticmethod
def get_type() -> ModuleType:
"""
获取模块类型
"""
return ModuleType.MediaServer
@staticmethod
def get_subtype() -> MediaServerType:
"""
获取模块子类型
"""
return MediaServerType.TrimeMedia
@staticmethod
def get_priority() -> int:
"""
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
"""
return 4
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
# 定时重连
for name, server in self.get_instances().items():
if server.is_configured() and server.is_inactive():
logger.info(f"飞牛影视 {name} 连接断开,尝试重连 ...")
server.reconnect()
def stop(self):
pass
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self.get_instances():
return None
for name, server in self.get_instances().items():
if not server.is_configured():
return False, f"飞牛影视配置不完整:{name}"
if server.is_inactive() and server.reconnect() != True:
return False, f"无法连接飞牛影视:{name}"
return True, ""
def user_authenticate(
self, credentials: AuthCredentials, service_name: Optional[str] = None
) -> Optional[AuthCredentials]:
"""
使用飞牛影视用户辅助完成用户认证
:param credentials: 认证数据
:param service_name: 指定要认证的媒体服务器名称,若为 None 则认证所有服务
:return: 认证数据
"""
# 飞牛影视认证
if not credentials or credentials.grant_type != "password":
return None
# 确定要认证的服务器列表
if service_name:
# 如果指定了服务名,获取该服务实例
servers = (
[(service_name, server)]
if (server := self.get_instance(service_name))
else []
)
else:
# 如果没有指定服务名,遍历所有服务
servers = self.get_instances().items()
# 遍历要认证的服务器
for name, server in servers:
# 触发认证拦截事件
intercept_event = eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(
username=credentials.username,
channel=self.get_name(),
service=name,
status="triggered",
),
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
continue
token = server.authenticate(credentials.username, credentials.password)
if token:
credentials.channel = self.get_name()
credentials.service = name
credentials.token = token
return credentials
return None
def webhook_parser(
self, body: Any, form: Any, args: Any
) -> Optional[schemas.WebhookEventInfo]:
"""
解析Webhook报文体
:param body: 请求体
:param form: 请求表单
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
source = args.get("source")
if source:
server: TrimeMedia = self.get_instance(source)
if not server:
return None
result = server.get_webhook_message(body)
if result:
result.server_name = source
return result
for server in self.get_instances().values():
if server:
result = server.get_webhook_message(body)
if result:
return result
return None
def media_exists(
self,
mediainfo: MediaInfo,
itemid: Optional[str] = None,
server: Optional[str] = None,
) -> Optional[schemas.ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
:param itemid: 媒体服务器ItemID
:param server: 媒体服务器名称
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
if server:
servers = [(server, self.get_instance(server))]
else:
servers = self.get_instances().items()
for name, s in servers:
if not server:
continue
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = s.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库 {name} 中找到了 {movie}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server_type="trimemedia",
server=name,
itemid=movie.item_id,
)
movies = s.get_movies(
title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
)
if not movies:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"媒体库 {name} 中找到了 {movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server_type="trimemedia",
server=name,
itemid=movies[0].item_id,
)
else:
itemid, tvs = s.get_tv_episodes(
title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid,
)
if not tvs:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(
f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}"
)
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server_type="trimemedia",
server=name,
itemid=itemid,
)
return None
def media_statistic(
self, server: Optional[str] = None
) -> Optional[List[schemas.Statistic]]:
"""
媒体数量统计
"""
if server:
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return None
servers = [server_obj]
else:
servers = self.get_instances().values()
media_statistics = []
for s in servers:
media_statistic = s.get_medias_count()
if not media_statistic:
continue
media_statistic.user_count = s.get_user_count()
media_statistics.append(media_statistic)
return media_statistics
def mediaserver_librarys(
self, server: Optional[str] = None, hidden: Optional[bool] = False, **kwargs
) -> Optional[List[schemas.MediaServerLibrary]]:
"""
媒体库列表
"""
server_obj: TrimeMedia = self.get_instance(server)
if server_obj:
return server_obj.get_librarys(hidden=hidden)
return None
def mediaserver_items(
self,
server: str,
library_id: Union[str, int],
start_index: Optional[int] = 0,
limit: Optional[int] = -1,
) -> Optional[Generator]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server_obj: TrimeMedia = self.get_instance(server)
if server_obj:
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_iteminfo(
self, server: str, item_id: str
) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情
"""
server_obj: TrimeMedia = self.get_instance(server)
if server_obj:
return server_obj.get_iteminfo(item_id)
return None
def mediaserver_tv_episodes(
self, server: str, item_id: Union[str, int]
) -> Optional[List[schemas.MediaServerSeasonInfo]]:
"""
获取剧集信息
"""
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return None
_, seasoninfo = server_obj.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [
schemas.MediaServerSeasonInfo(season=season, episodes=episodes)
for season, episodes in seasoninfo.items()
]
def mediaserver_playing(
self, server: str, count: Optional[int] = 20, **kwargs
) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_resume(num=count)
def mediaserver_play_url(
self, server: str, item_id: Union[str, int]
) -> Optional[str]:
"""
获取媒体库播放地址
"""
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return None
return server_obj.get_play_url(item_id)
def mediaserver_latest(
self,
server: Optional[str] = None,
count: Optional[int] = 20,
**kwargs,
) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_latest(num=count)
def mediaserver_latest_images(
self,
server: Optional[str] = None,
count: Optional[int] = 20,
remote: Optional[bool] = False,
**kwargs,
) -> List[str]:
"""
获取媒体服务器最新入库条目的图片
:param server: 媒体服务器名称
:param count: 获取数量
:param remote: True为外网链接, False为内网链接
:return: 图片链接列表
"""
server_obj: TrimeMedia = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_latest_backdrops(num=count, remote=remote)

View File

@@ -0,0 +1,446 @@
import hashlib
import json
import random
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union
from app.core.config import settings
from app.log import logger
from app.utils.http import RequestUtils, requests
@dataclass
class User:
guid: str
username: str
is_admin: int = 0
class Category(Enum):
Movie = "Movie"
TV = "TV"
Mix = "Mix"
Others = "Others"
@classmethod
def _missing_(self, value):
return self.Others
class Type(Enum):
Movie = "Movie"
TV = "TV"
Season = "Season"
Episode = "Episode"
Video = "Video"
Directory = "Directory"
@classmethod
def _missing_(self, value):
return self.Video
@dataclass
class MediaDb:
guid: str
category: Category
name: Optional[str] = None
posters: Optional[list[str]] = None
dir_list: Optional[list[str]] = None
@dataclass
class MediaDbSumary:
favorite: int = 0
movie: int = 0
tv: int = 0
video: int = 0
total: int = 0
@dataclass
class Item:
guid: str
ancestor_guid: str = ""
type: Optional[Type] = None
# 当type为Episode时是剧名parent_title是季名title作为分集名称
tv_title: Optional[str] = None
parent_title: Optional[str] = None
title: Optional[str] = None
original_title: Optional[str] = None
overview: Optional[str] = None
poster: Optional[str] = None
backdrops: Optional[str] = None
posters: Optional[str] = None
douban_id: Optional[int] = None
imdb_id: Optional[str] = None
trim_id: Optional[str] = None
release_date: Optional[str] = None
air_date: Optional[str] = None
vote_average: Optional[str] = None
season_number: Optional[int] = None
episode_number: Optional[int] = None
duration: Optional[int] = None # 片长(秒)
ts: Optional[int] = None # 已播放(秒)
watched: Optional[int] = None # 1:已看完
@property
def tmdb_id(self) -> Optional[int]:
if self.trim_id is None:
return None
if self.trim_id.startswith("tt") or self.trim_id.startswith("tm"):
# 飞牛给tmdbid加了前缀用以区分tv或movie
return int(self.trim_id[2:])
return None
class Api:
__slots__ = (
"_host",
"_token",
"_apikey",
"_api_path",
"_request_utils",
)
@property
def token(self) -> Optional[str]:
return self._token
@property
def host(self) -> str:
return self._host
@property
def apikey(self) -> str:
return self._apikey
def __init__(self, host: str, apikey: str):
self._api_path = "/v/api/v1"
self._host = host.rstrip("/")
self._apikey = apikey
self._token = None
self._request_utils = RequestUtils(session=requests.Session())
def login(self, username, password) -> Optional[str]:
"""
登录飞牛影视
:return: 成功返回token 否则返回None
"""
if (
res := self.__request_api(
"/login",
data={
"username": username,
"password": password,
"app_name": "trimemedia-web",
},
)
) and res.success:
self._token = res.data.get("token")
return self._token
def logout(self) -> bool:
"""
退出账号
"""
if (res := self.__request_api("/user/logout", method="post")) and res.success:
if res.data == True:
self._token = None
return True
return False
def user_list(self) -> Optional[list[User]]:
"""
用户列表(仅管理员有权访问)
"""
if (res := self.__request_api("/manager/user/list")) and res.success:
return [
User(
guid=info.get("guid"),
username=info.get("username"),
is_admin=info.get("is_admin", 0),
)
for info in res.data
]
return None
def user_info(self) -> Optional[User]:
"""
当前用户信息
"""
if (res := self.__request_api("/user/info")) and res.success:
user = User("", "")
user.__dict__.update(res.data)
return user
return None
def mediadb_sum(self) -> Optional[MediaDbSumary]:
"""
媒体数量统计
"""
if (res := self.__request_api("/mediadb/sum")) and res.success:
sum = MediaDbSumary()
sum.__dict__.update(res.data)
return sum
return None
def mediadb_list(self) -> Optional[MediaDbSumary]:
"""
媒体库列表(普通用户)
"""
if (res := self.__request_api("/mediadb/list")) and res.success:
items = []
for info in res.data:
mdb = MediaDb(
guid=info.get("guid"),
category=Category(info.get("category")),
name=info.get("title", ""),
posters=[
self.__build_img_api_url(poster)
for poster in info.get("posters", [])
],
)
items.append(mdb)
return items
return None
def __build_img_api_url(self, img_path: Optional[str]) -> Optional[str]:
if not img_path:
return None
if img_path[0] != "/":
img_path = "/" + img_path
return f"{self._api_path}/sys/img{img_path}"
def mdb_list(self) -> Optional[list[MediaDb]]:
"""
媒体库列表(管理员)
"""
if (res := self.__request_api("/mdb/list")) and res.success:
items = []
for info in res.data:
mdb = MediaDb(
guid=info.get("guid"),
category=Category(info.get("category")),
name=info.get("name", ""),
posters=[
self.__build_img_api_url(poster)
for poster in info.get("posters", [])
],
dir_list=info.get("dir_list"),
)
items.append(mdb)
return items
return None
def mdb_scanall(self) -> bool:
"""
扫描所有媒体库
"""
if (res := self.__request_api("/mdb/scanall", method="post")) and res.success:
if res.data == True:
self._token = None
return True
return False
def mdb_scan(self, mdb: MediaDb) -> bool:
"""
扫描指定媒体库
"""
if (
res := self.__request_api(f"/mdb/scan/{mdb.guid}", data={})
) and res.success:
if res.data == True:
self._token = None
return True
return False
def __build_item(self, info: dict) -> Item:
"""
构造媒体Item
"""
item = Item(guid="")
item.__dict__.update(info)
item.type = Type(info.get("type"))
# Item详情接口才有posters和backdrops
item.posters = self.__build_img_api_url(item.posters)
item.backdrops = self.__build_img_api_url(item.backdrops)
item.poster = (
self.__build_img_api_url(item.poster) if item.poster else item.posters
)
return item
def item_list(
self,
guid: Optional[str] = None,
type: Optional[list[Type]] = [Type.Movie, Type.TV, Type.Directory, Type.Video],
exclude_grouped_video=True,
page=1,
page_size=22,
sort_by="create_time",
sort="DESC",
) -> Optional[list[Item]]:
"""
媒体列表
"""
post = {
"tags": {"type": type} if type else {},
"sort_type": sort,
"sort_column": sort_by,
"page": page,
"page_size": page_size,
}
if guid:
post["ancestor_guid"] = guid
if exclude_grouped_video:
post["exclude_grouped_video"] = 1
if (res := self.__request_api("/item/list", data=post)) and res.success:
return [self.__build_item(info) for info in res.data.get("list", [])]
return None
def search_list(self, keywords: str) -> Optional[list[Item]]:
"""
搜索影片、演员
"""
if (
res := self.__request_api("/search/list", params={"q": keywords})
) and res.success:
return [self.__build_item(info) for info in res.data]
return None
def item(self, guid: str) -> Optional[Item]:
""" """
if (res := self.__request_api(f"/item/{guid}")) and res.success:
return self.__build_item(res.data)
return None
def season_list(self, tv_guid: str) -> Optional[list[Item]]:
""" """
if (res := self.__request_api(f"/season/list/{tv_guid}")) and res.success:
return [self.__build_item(info) for info in res.data]
return None
def episode_list(self, season_guid: str) -> Optional[list[Item]]:
""" """
if (res := self.__request_api(f"/episode/list/{season_guid}")) and res.success:
return [self.__build_item(info) for info in res.data]
return None
def play_list(self) -> Optional[list[Item]]:
"""
继续观看列表
"""
if (res := self.__request_api("/play/list")) and res.success:
return [self.__build_item(info) for info in res.data]
return None
################################################################
def __get_authx(self, api_path, body: Optional[str]):
"""
计算消息签名
"""
if api_path[0] != "/":
api_path = "/" + api_path
nonce = str(random.randint(100000, 999999))
ts = str(int(time.time() * 1000))
md5 = hashlib.md5()
md5.update((body or "").encode())
data_hash = md5.hexdigest()
md5 = hashlib.md5()
md5.update(
"_".join(
[
"NDzZTVxnRKP8Z0jXg1VAMonaG8akvh",
api_path,
nonce,
ts,
data_hash,
self._apikey,
]
).encode()
)
sign = md5.hexdigest()
return f"nonce={nonce}&timestamp={ts}&sign={sign}"
def __request_api(
self, api: str, method: str = None, params: dict = None, data: dict = None
):
"""
请求飞牛影视API
"""
@dataclass
class Result:
@property
def success(self) -> bool:
return code == 0
code: int
msg: Optional[str] = None
data: Optional[Union[dict, list, str, bool]] = None
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Type):
return obj.value
return super().default(self, obj)
if not self._host or not api:
return None
if api[0] != "/":
api = "/" + api
api_path = self._api_path + api
url = self._host + api_path
if method is None:
method = "get" if data is None else "post"
if method == "post":
json_body = (
json.dumps(data, allow_nan=False, cls=JsonEncoder) if data else ""
)
else:
json_body = None
headers = {
"User-Agent": settings.USER_AGENT,
"Authorization": self._token,
"authx": self.__get_authx(api_path, json_body),
}
if json_body is not None:
headers["Content-Type"] = "application/json"
try:
res = self._request_utils.request(
method=method, url=url, headers=headers, params=params, data=json_body
)
if res:
resp = res.json()
msg = resp.get("msg")
if code := int(resp.get("code", -1)):
logger.error(f"请求接口 {api_path} 失败,错误码:{code} {msg}")
return Result(code, msg)
return Result(0, msg, resp.get("data"))
else:
logger.error(f"请求接口 {api_path} 失败")
except Exception as e:
logger.error(f"请求接口 {api_path} 异常:" + str(e))
return None
if __name__ == "__main__":
api = Api("http://192.168.1.49:5666/", "16CCEB3D-AB42-077D-36A1-F355324E4237")
api.login("adad", "123456")
logger.debug(f"token={api.token}")
user = api.user_info()
logger.debug(user)
mediadbs = api.mdb_list()
logger.debug(mediadbs)
items = api.item_list(mediadbs[0].guid, page=1, page_size=0)
logger.debug(items)
api.logout()

View File

@@ -0,0 +1,550 @@
import json
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Tuple, Union
import app.modules.trimemedia.api as fnapi
from app import schemas
from app.log import logger
from app.schemas import MediaType
from app.utils.url import UrlUtils
class TrimeMedia:
_username: Optional[str] = None
_password: Optional[str] = None
_userinfo: Optional[fnapi.User] = None
_playhost: Optional[str] = None
_libraries: dict[str, fnapi.MediaDb] = {}
_sync_libraries: List[str] = []
_api: Optional[fnapi.Api] = None
def __init__(
self,
host: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
play_host: Optional[str] = None,
sync_libraries: list = None,
**kwargs,
):
if not host or not username or not password:
logger.error("飞牛影视配置不完整!!")
return
host = UrlUtils.standardize_base_url(host).rstrip("/")
if play_host:
self._playhost = UrlUtils.standardize_base_url(play_host).rstrip("/")
self._username = username
self._password = password
self._sync_libraries = sync_libraries or []
self._api = fnapi.Api(host, apikey="16CCEB3D-AB42-077D-36A1-F355324E4237")
self.reconnect()
def is_configured(self) -> bool:
return self._api is not None
def is_authenticated(self) -> bool:
return self.is_configured() and self._api.token is not None
def is_inactive(self) -> bool:
"""
判断是否需要重连
"""
if not self.is_authenticated():
return True
self._userinfo = self._api.user_info()
return self._userinfo is None
def reconnect(self):
"""
重连
"""
if not self.is_configured():
return False
if self._api.login(self._username, self._password) is None:
return False
self._userinfo = self._api.user_info()
if self._userinfo is None:
return False
logger.debug(f"{self._userinfo.username} 成功登录飞牛影视")
return True
def get_librarys(
self, hidden: Optional[bool] = False
) -> List[schemas.MediaServerLibrary]:
"""
获取媒体服务器所有媒体库列表
"""
if not self.is_authenticated():
return []
if self._userinfo.is_admin == 1:
mdb_list = self._api.mdb_list() or []
else:
mdb_list = self._api.mediadb_list() or []
self._libraries = {lib.guid: lib for lib in mdb_list}
libraries = []
for library in self._libraries.values():
if hidden and self.__is_library_blocked(library.guid):
continue
if library.category == fnapi.Category.Movie:
library_type = MediaType.MOVIE.value
elif library.category == fnapi.Category.TV:
library_type = MediaType.TV.value
elif library.category == fnapi.Category.Others:
# 忽略这个库
continue
else:
library_type = MediaType.UNKNOWN.value
libraries.append(
schemas.MediaServerLibrary(
server="trimemedia",
id=library.guid,
name=library.name,
type=library_type,
path=library.dir_list,
image_list=[
f"{self._api.host}{img_path}?w=256"
for img_path in library.posters or []
],
link=f"{self._playhost or self._api.host}/v/library/{library.guid}",
)
)
return libraries
def get_user_count(self) -> int:
"""
获取用户数量(非管理员不能调用)
"""
if not self.is_authenticated():
return 0
if not self._userinfo or self._userinfo.is_admin != 1:
return 0
return len(self._api.user_list() or [])
def get_medias_count(self) -> schemas.Statistic:
"""
获取媒体数量
:return: MovieCount SeriesCount
"""
if not self.is_authenticated():
return schemas.Statistic()
if (info := self._api.mediadb_sum()) is None:
return schemas.Statistic()
return schemas.Statistic(
movie_count=info.movie,
tv_count=info.tv,
)
def authenticate(self, username: str, password: str) -> Optional[str]:
"""
用户认证
:param username: 用户名
:param password: 密码
:return: 认证成功返回token否则返回None
"""
if not username or not password:
return None
if not self.is_configured():
return None
feiniu = fnapi.Api(self._api.host, self._api.apikey)
if token := feiniu.login(username, password):
feiniu.logout()
return token
def get_movies(
self, title: str, year: Optional[str] = None, tmdb_id: Optional[int] = None
) -> Optional[List[schemas.MediaServerItem]]:
"""
根据标题和年份,检查电影是否在飞牛中存在,存在则返回列表
:param title: 标题
:param year: 年份,为空则不过滤
:param tmdb_id: TMDB ID
:return: 含title、year属性的字典列表
"""
if not self.is_authenticated():
return None
movies = []
items = self._api.search_list(keywords=title) or []
for item in items:
if item.type != fnapi.Type.Movie:
continue
if (
(not tmdb_id or tmdb_id == item.tmdb_id)
and title in [item.title, item.original_title]
and (not year or (item.release_date and item.release_date[:4] == year))
):
movies.append(self.__build_media_server_item(item))
return movies
def __get_series_id_by_name(self, name: str, year: str) -> Optional[str]:
items = self._api.search_list(keywords=name) or []
for item in items:
if item.type != fnapi.Type.TV:
continue
# 可惜搜索接口不下发original_title 也不能指定分类、年份
if name in [item.title, item.original_title]:
if not year or (item.air_date and item.air_date[:4] == year):
return item.guid
return None
def get_tv_episodes(
self,
item_id: Optional[str] = None,
title: Optional[str] = None,
year: Optional[str] = None,
tmdb_id: Optional[int] = None,
season: Optional[int] = None,
) -> Tuple[Optional[str], Optional[Dict[int, list]]]:
"""
根据标题和年份和季,返回飞牛中的剧集列表
:param item_id: 飞牛影视中的guid
:param title: 标题
:param year: 年份
:param tmdb_id: TMDBID
:param season: 季
:return: 集号的列表
"""
if not self.is_authenticated():
return None, None
if not item_id:
item_id = self.__get_series_id_by_name(title, year)
if item_id is None:
return None, None
item_info = self.get_iteminfo(item_id)
if not item_info:
return None, {}
if tmdb_id and item_info.tmdbid:
if tmdb_id != item_info.tmdbid:
return None, {}
seasons = self._api.season_list(item_id)
if not seasons:
# 季列表获取失败
return None, {}
if season is not None:
for item in seasons:
if item.season_number == season:
seasons = [item]
break
else:
# 没有匹配的季
return None, {}
season_episodes = {}
for item in seasons:
episodes = self._api.episode_list(item.guid)
for episode in episodes or []:
if episode.season_number not in season_episodes:
season_episodes[episode.season_number] = []
season_episodes[episode.season_number].append(episode.episode_number)
return item_id, season_episodes
def refresh_root_library(self) -> Optional[bool]:
"""
通知飞牛刷新整个媒体库(非管理员不能调用)
"""
if not self.is_authenticated():
return None
if not self._userinfo or self._userinfo.is_admin != 1:
logger.error("飞牛仅支持管理员账号刷新媒体库")
return False
logger.info("刷新所有媒体库")
return self._api.mdb_scanall()
def refresh_library_by_items(
self, items: List[schemas.RefreshMediaItem]
) -> Optional[bool]:
"""
按路径刷新所在的媒体库(非管理员不能调用)
:param items: 已识别的需要刷新媒体库的媒体信息列表
"""
if not self.is_authenticated():
return None
if not self._userinfo or self._userinfo.is_admin != 1:
logger.error("飞牛仅支持管理员账号刷新媒体库")
return False
libraries = set()
for item in items:
lib = self.__match_library_by_path(item.target_path)
if lib is None:
# 如果有匹配失败的,刷新整个库
return self._api.mdb_scanall()
# 媒体库去重
libraries.add(lib.guid)
for lib_guid in libraries:
# 逐个刷新
lib = self._libraries[lib_guid]
logger.info(f"刷新媒体库:{lib.name}")
if not self._api.mdb_scan(lib):
# 如果失败,刷新整个库
return self._api.mdb_scanall()
return True
def __match_library_by_path(self, path: Path) -> Optional[fnapi.MediaDb]:
def is_subpath(_path: Path, _parent: Path) -> bool:
"""
判断_path是否是_parent的子目录下
"""
_path = _path.resolve()
_parent = _parent.resolve()
return _path.parts[: len(_parent.parts)] == _parent.parts
if path is None:
return None
for lib in self._libraries.values():
for dir in lib.dir_list or []:
if is_subpath(path, Path(dir)):
return lib
return None
# TODO 飞牛似乎还没有这个功能
def get_webhook_message(self, body: any) -> Optional[schemas.WebhookEventInfo]:
return None
def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
"""
获取单个项目详情
"""
if not self.is_authenticated():
return None
if item := self._api.item(guid=itemid):
return self.__build_media_server_item(item)
return None
@staticmethod
def __build_media_server_item(item: fnapi.Item):
if item.air_date and item.type == fnapi.Type.TV:
year = item.air_date[:4]
elif item.release_date:
year = item.release_date[:4]
else:
year = None
user_state = schemas.MediaServerItemUserState()
if item.watched:
user_state.played = True
if item.duration and item.ts is not None:
user_state.percentage = item.ts / item.duration
user_state.resume = True
if item.type is None:
item_type = None
else:
# 将飞牛的媒体类型转为MP能识别的
item_type = "Series" if item.type == fnapi.Type.TV else item.type.value
return schemas.MediaServerItem(
server="trimemedia",
library=item.ancestor_guid,
item_id=item.guid,
item_type=item_type,
title=item.title,
original_title=item.original_title,
year=year,
tmdbid=item.tmdb_id,
imdbid=item.imdb_id,
user_state=user_state,
)
@staticmethod
def __build_play_url(host: str, item: fnapi.Item) -> str:
"""
拼装播放链接
"""
if item.type == fnapi.Type.Episode:
return f"{host}/v/tv/episode/{item.guid}"
elif item.type == fnapi.Type.Season:
return f"{host}/v/tv/season/{item.guid}"
elif item.type == fnapi.Type.Movie:
return f"{host}/v/movie/{item.guid}"
elif item.type == fnapi.Type.TV:
return f"{host}/v/tv/{item.guid}"
else:
# 其它类型走通用页面,由飞牛来判断
return f"{host}/v/other/{item.guid}"
def __build_media_server_play_item(
self, item: fnapi.Item
) -> schemas.MediaServerPlayItem:
"""
:params use_backdrop: 是否优先使用Backdrop类型的图片
"""
if item.type == fnapi.Type.Episode:
title = item.tv_title
subtitle = f"S{item.season_number}:{item.episode_number} - {item.title}"
else:
title = item.title
subtitle = "电影" if item.type == fnapi.Type.Movie else "视频"
type = (
MediaType.MOVIE.value
if item.type in [fnapi.Type.Movie, fnapi.Type.Video]
else MediaType.TV.value
)
return schemas.MediaServerPlayItem(
id=item.guid,
title=title,
subtitle=subtitle,
type=type,
image=f"{self._api.host}{item.poster}",
link=self.__build_play_url(self._playhost or self._api.host, item),
percent=(
item.ts / item.duration * 100.0
if item.duration and item.ts is not None
else 0
),
)
def get_items(
self,
parent: Union[str, int],
start_index: Optional[int] = 0,
limit: Optional[int] = -1,
) -> Generator[schemas.MediaServerItem | None | Any, Any, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not self.is_authenticated():
return None
if (SIZE := limit) is None:
SIZE = -1
items = (
self._api.item_list(
guid=parent,
page=start_index + 1,
page_size=SIZE,
type=[fnapi.Type.Movie, fnapi.Type.TV, fnapi.Type.Directory],
)
or []
)
for item in items:
if item.type == fnapi.Type.Directory:
for items in self.get_items(parent=item.guid):
yield items
elif item.type in [fnapi.Type.Movie, fnapi.Type.TV]:
yield self.__build_media_server_item(item)
return None
def get_play_url(self, item_id: str) -> str:
"""
获取媒体的外网播放链接
:param item_id: 媒体ID
"""
if not self.is_authenticated():
return None
if (item := self._api.item(item_id)) is None:
return None
# 根据查询到的信息拼装出播放链接
return self.__build_play_url(self._playhost or self._api.host, item)
def get_resume(
self, num: Optional[int] = 12
) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获取继续观看列表
:param num: 列表大小None不限制数量
"""
if not self.is_authenticated():
return None
ret_resume = []
for item in self._api.play_list() or []:
if len(ret_resume) == num:
break
if self.__is_library_blocked(item.ancestor_guid):
continue
ret_resume.append(self.__build_media_server_play_item(item))
return ret_resume
def get_latest(self, num=20) -> Optional[List[schemas.MediaServerPlayItem]]:
"""
获取最近更新列表
"""
if not self.is_authenticated():
return None
items = (
self._api.item_list(
page=1,
page_size=max(100, num * 5),
type=[fnapi.Type.Movie, fnapi.Type.TV],
)
or []
)
latest = []
for item in items:
if len(latest) == num:
break
if self.__is_library_blocked(item.ancestor_guid):
continue
latest.append(self.__build_media_server_play_item(item))
return latest
def get_latest_backdrops(self, num=20, remote=False) -> Optional[List[str]]:
"""
获取最近更新的媒体Backdrop图片
"""
if not self.is_authenticated():
return None
items = (
self._api.item_list(
page=1,
page_size=max(100, num * 5),
type=[fnapi.Type.Movie, fnapi.Type.TV],
)
or []
)
backdrops = []
for item in items:
if len(backdrops) == num:
break
if self.__is_library_blocked(item.ancestor_guid):
continue
if (item_details := self._api.item(item.guid)) is None:
continue
if remote:
img_host = self._playhost or self._api.host
else:
img_host = self._api.host
if item_details.backdrops:
item_image = item_details.backdrops
else:
item_image = (
item_details.posters
if item_details.posters
else item_details.poster
)
backdrops.append(f"{img_host}{item_image}")
return backdrops
def __is_library_blocked(self, library_guid: str):
if library := self._libraries.get(library_guid):
if library.category == fnapi.Category.Others:
# 忽略这个库
return True
return (
True
if (
self._sync_libraries
and "all" not in self._sync_libraries
and library_guid not in self._sync_libraries
)
else False
)

View File

@@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel):
type: Optional[MediaType]
# 季
seasons: Optional[Dict[int, list]] = Field(default_factory=dict)
# 媒体服务器类型plex、jellyfin、emby
# 媒体服务器类型plex、jellyfin、emby、trimemedia
server_type: Optional[str] = None
# 媒体服务器名称
server: Optional[str] = None

View File

@@ -219,6 +219,8 @@ class MediaServerType(Enum):
Jellyfin = "Jellyfin"
# Plex
Plex = "Plex"
# 飞牛影视
TrimeMedia = "TrimeMedia"
# 识别器类型