Files
MoviePilot/app/modules/ugreen/__init__.py

359 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.ugreen.ugreen import Ugreen
from app.schemas import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType
class UgreenModule(_ModuleBase, _MediaServerBase[Ugreen]):
def init_module(self) -> None:
"""
初始化模块
"""
super().init_service(
service_name=Ugreen.__name__.lower(),
service_type=lambda conf: Ugreen(
**conf.config, sync_libraries=conf.sync_libraries
),
)
@staticmethod
def get_name() -> str:
return "绿联影视"
@staticmethod
def get_type() -> ModuleType:
"""
获取模块类型
"""
return ModuleType.MediaServer
@staticmethod
def get_subtype() -> MediaServerType:
"""
获取模块子类型
"""
return MediaServerType.Ugreen
@staticmethod
def get_priority() -> int:
"""
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
"""
return 5
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
for name, server in self.get_instances().items():
if server.is_configured() and server.is_inactive():
logger.info(f"绿联影视 {name} 连接断开,尝试重连 ...")
server.reconnect()
def stop(self):
for server in self.get_instances().values():
if server.is_authenticated():
server.disconnect()
def test(self) -> Optional[Tuple[bool, str]]:
"""
测试模块连接性
"""
if not self.get_instances():
return None
for name, server in self.get_instances().items():
if not server.is_configured():
return False, f"绿联影视配置不完整:{name}"
if server.is_inactive() and not server.reconnect():
return False, f"无法连接绿联影视:{name}"
return True, ""
def user_authenticate(
self, credentials: AuthCredentials, service_name: Optional[str] = None
) -> Optional[AuthCredentials]:
"""
使用绿联影视用户辅助完成用户认证
"""
if not credentials or credentials.grant_type != "password":
return None
if service_name:
servers = (
[(service_name, server)]
if (server := self.get_instance(service_name))
else []
)
else:
servers = self.get_instances().items()
for name, server in servers:
intercept_event = eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(
username=credentials.username,
channel=self.get_name(),
service=name,
status="triggered",
),
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
continue
token = server.authenticate(credentials.username, credentials.password)
if token:
credentials.channel = self.get_name()
credentials.service = name
credentials.token = token
return credentials
return None
def webhook_parser(
self, body: Any, form: Any, args: Any
) -> Optional[schemas.WebhookEventInfo]:
"""
解析Webhook报文体
"""
source = args.get("source")
if source:
server: Optional[Ugreen] = self.get_instance(source)
if not server:
return None
result = server.get_webhook_message(body)
if result:
result.server_name = source
return result
for server in self.get_instances().values():
if server:
result = server.get_webhook_message(body)
if result:
return result
return None
def media_exists(
self,
mediainfo: MediaInfo,
itemid: Optional[str] = None,
server: Optional[str] = None,
) -> Optional[schemas.ExistMediaInfo]:
"""
判断媒体文件是否存在
"""
if server:
servers = [(server, self.get_instance(server))]
else:
servers = self.get_instances().items()
for name, s in servers:
if not s:
continue
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = s.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库 {name} 中找到了 {movie}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server_type="ugreen",
server=name,
itemid=movie.item_id,
)
movies = s.get_movies(
title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
)
if not movies:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
logger.info(f"媒体库 {name} 中找到了 {movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server_type="ugreen",
server=name,
itemid=movies[0].item_id,
)
itemid, tvs = s.get_tv_episodes(
title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid,
)
if not tvs:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server_type="ugreen",
server=name,
itemid=itemid,
)
return None
def media_statistic(
self, server: Optional[str] = None
) -> Optional[List[schemas.Statistic]]:
"""
媒体数量统计
"""
if server:
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return None
servers = [server_obj]
else:
servers = self.get_instances().values()
media_statistics = []
for s in servers:
media_statistic = s.get_medias_count()
if not media_statistic:
continue
media_statistic.user_count = s.get_user_count()
media_statistics.append(media_statistic)
return media_statistics
def mediaserver_librarys(
self, server: Optional[str] = None, hidden: Optional[bool] = False, **kwargs
) -> Optional[List[schemas.MediaServerLibrary]]:
"""
媒体库列表
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if server_obj:
return server_obj.get_librarys(hidden=hidden)
return None
def mediaserver_items(
self,
server: str,
library_id: Union[str, int],
start_index: Optional[int] = 0,
limit: Optional[int] = -1,
) -> Optional[Generator]:
"""
获取媒体服务器项目列表
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if server_obj:
return server_obj.get_items(library_id, start_index, limit)
return None
def mediaserver_iteminfo(
self, server: str, item_id: str
) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if server_obj:
return server_obj.get_iteminfo(item_id)
return None
def mediaserver_tv_episodes(
self, server: str, item_id: Union[str, int]
) -> Optional[List[schemas.MediaServerSeasonInfo]]:
"""
获取剧集信息
"""
if not item_id:
return None
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return None
_, seasoninfo = server_obj.get_tv_episodes(item_id=str(item_id))
if not seasoninfo:
return []
return [
schemas.MediaServerSeasonInfo(season=season, episodes=episodes)
for season, episodes in seasoninfo.items()
]
def mediaserver_playing(
self, server: str, count: Optional[int] = 20, **kwargs
) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_resume(num=count) or []
def mediaserver_play_url(
self, server: str, item_id: Union[str, int]
) -> Optional[str]:
"""
获取媒体库播放地址
"""
if not item_id:
return None
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return None
return server_obj.get_play_url(str(item_id))
def mediaserver_latest(
self,
server: Optional[str] = None,
count: Optional[int] = 20,
**kwargs,
) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_latest(num=count) or []
def mediaserver_latest_images(
self,
server: Optional[str] = None,
count: Optional[int] = 20,
remote: Optional[bool] = False,
**kwargs,
) -> List[str]:
"""
获取媒体服务器最新入库条目的图片
"""
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return []
return server_obj.get_latest_backdrops(num=count, remote=remote) or []
def mediaserver_image_cookies(
self,
server: Optional[str] = None,
image_url: Optional[str] = None,
**kwargs,
) -> Optional[str | dict]:
"""
获取绿联影视服务器的图片Cookies
"""
if not image_url:
return None
if server:
server_obj: Optional[Ugreen] = self.get_instance(server)
if not server_obj:
return None
return server_obj.get_image_cookies(image_url)
for server_obj in self.get_instances().values():
if cookies := server_obj.get_image_cookies(image_url):
return cookies
return None