feat(fanart): 添加异步支持并优化图片处理逻辑

This commit is contained in:
Attente
2026-05-09 23:51:04 +08:00
committed by jxxghp
parent 4027ae2641
commit ee9ea54ab7
3 changed files with 209 additions and 71 deletions

View File

@@ -310,21 +310,6 @@ class MediaInfo:
if isinstance(self.type, str):
self.type = MediaType(self.type)
def set_image(self, name: str, image: str):
"""
设置图片地址
"""
setattr(self, f"{name}_path", image)
def get_image(self, name: str):
"""
获取图片地址
"""
try:
return getattr(self, f"{name}_path")
except AttributeError:
return None
def set_category(self, cat: str):
"""
设置二级分类

View File

@@ -1,4 +1,6 @@
import asyncio
import re
from pathlib import Path
from typing import Optional, Tuple, Union
from app.core.cache import cached
@@ -6,7 +8,7 @@ from app.core.context import MediaInfo, settings
from app.log import logger
from app.modules import _ModuleBase
from app.schemas.types import MediaType, ModuleType, OtherModulesType
from app.utils.http import RequestUtils
from app.utils.http import RequestUtils, AsyncRequestUtils
class FanartModule(_ModuleBase):
@@ -307,8 +309,12 @@ class FanartModule(_ModuleBase):
_proxies: dict = settings.PROXY
# Fanart Api
_movie_url: str = f'https://webservice.fanart.tv/v3/movies/%s?api_key={settings.FANART_API_KEY}'
_tv_url: str = f'https://webservice.fanart.tv/v3/tv/%s?api_key={settings.FANART_API_KEY}'
_movie_url: str = (
f"https://webservice.fanart.tv/v3/movies/%s?api_key={settings.FANART_API_KEY}"
)
_tv_url: str = (
f"https://webservice.fanart.tv/v3/tv/%s?api_key={settings.FANART_API_KEY}"
)
def init_module(self) -> None:
pass
@@ -361,21 +367,112 @@ class FanartModule(_ModuleBase):
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
images = self.__obtain_fanart_images(mediainfo=mediainfo)
if not images:
return None
self.__set_mediainfo_images(mediainfo=mediainfo, images=images)
return mediainfo
async def async_obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
获取图片(异步版本)
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
images = await self.__async_obtain_fanart_images(mediainfo=mediainfo)
if not images:
return None
self.__set_mediainfo_images(mediainfo=mediainfo, images=images)
return mediainfo
@classmethod
def __set_mediainfo_images(cls, mediainfo: MediaInfo, images: dict) -> None:
"""
显式回填 MediaInfo 支持的展示图片字段
"""
for image_name, image_url in images.items():
image_attr = cls.__mediainfo_image_attr(image_name)
if image_attr and not getattr(mediainfo, image_attr, None):
setattr(mediainfo, image_attr, image_url)
logger.debug(f"{mediainfo.title_year} 使用 Fanart 图片回填 {image_attr}{image_name}")
def metadata_img(
self,
mediainfo: MediaInfo,
season: Optional[int] = None,
episode: Optional[int] = None,
) -> Optional[dict]:
"""
获取图片名称和url
:param mediainfo: 媒体信息
:param season: 季号
:param episode: 集号
"""
if episode is not None:
# Fanart 没有集图片
return None
return self.__obtain_fanart_images(mediainfo=mediainfo, season=season)
def __obtain_fanart_images(
self, mediainfo: MediaInfo, season: Optional[int] = None
) -> Optional[dict]:
"""
获取 Fanart 图片并转换为刮削图片名称
"""
query = self.__fanart_query(mediainfo=mediainfo)
if not query:
return None
result = self.__request_fanart(*query)
return self.__extract_images(mediainfo=mediainfo, result=result, season=season)
async def __async_obtain_fanart_images(
self,
mediainfo: MediaInfo,
season: Optional[int] = None,
) -> Optional[dict]:
"""
异步获取 Fanart 图片并转换为刮削图片名称
"""
query = self.__fanart_query(mediainfo=mediainfo)
if not query:
return None
result = await self.__async_request_fanart(*query)
return self.__extract_images(mediainfo=mediainfo, result=result, season=season)
@staticmethod
def __fanart_query(
mediainfo: MediaInfo,
) -> Optional[Tuple[MediaType, Union[str, int]]]:
"""
获取 Fanart 查询参数
"""
if not settings.FANART_ENABLE:
return None
if not mediainfo.tmdb_id and not mediainfo.tvdb_id:
return None
if mediainfo.type == MediaType.MOVIE:
result = self.__request_fanart(mediainfo.type, mediainfo.tmdb_id)
else:
if mediainfo.tvdb_id:
result = self.__request_fanart(mediainfo.type, mediainfo.tvdb_id)
else:
logger.info(f"{mediainfo.title_year} 没有tvdbid无法获取fanart图片")
return None
if not result or result.get('status') == 'error':
return mediainfo.type, mediainfo.tmdb_id
if mediainfo.tvdb_id:
return mediainfo.type, mediainfo.tvdb_id
logger.info(f"{mediainfo.title_year} 没有tvdbid无法获取fanart图片")
return None
def __extract_images(
self,
mediainfo: MediaInfo,
result: Optional[dict],
season: Optional[int] = None,
) -> Optional[dict]:
"""
从 Fanart 响应中提取图片名称和地址
"""
if not result or result.get("status") == "error":
logger.warn(f"没有获取到 {mediainfo.title_year} 的fanart图片数据")
return None
ret = {}
# 获取所有图片
for name, images in result.items():
if not images:
@@ -386,68 +483,92 @@ class FanartModule(_ModuleBase):
# 图片属性xx_path
image_name = self.__name(name)
if image_name.startswith("season"):
image_type = image_name[6:]
# 季图片图片格式seasonxx-xxxx/season-specials-xxxx
for image_obj in images:
image_season = image_obj.get('season')
image_season = image_obj.get("season")
if image_season is not None:
if season is not None and str(image_season) != str(season):
continue
# 包括poster,thumb,banner
if image_season == '0':
season_image = f"season-specials-{image_name[6:]}"
if image_season == "0":
season_image = f"season-specials-{image_type}"
else:
season_image = f"season{str(image_season).rjust(2, '0')}-{image_name[6:]}"
# 设置图片,没有图片才设置
if not mediainfo.get_image(season_image):
mediainfo.set_image(season_image, image_obj.get('url'))
season_image = (
f"season{str(image_season).rjust(2, '0')}-{image_type}"
)
if image_url := image_obj.get("url"):
ret.setdefault(
f"{season_image}{Path(image_url).suffix}", image_url
)
else:
if season is not None:
continue
image_obj = self.__pick_best_image(images)
if image_url := image_obj.get("url"):
ret[f"{image_name}{Path(image_url).suffix}"] = image_url
# 其他图片优先环境变量指定语言再like最多
def __pick_best_image(_images):
lang_env = settings.FANART_LANG
if lang_env:
langs = [lang.strip() for lang in lang_env.split(",") if lang.strip()]
for lang in langs:
lang_images = [img for img in _images if img.get('lang') == lang]
if lang_images:
lang_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return lang_images[0]
# 没设置或没找到,按原逻辑 zh、en、like最多
zh_images = [img for img in _images if img.get('lang') == 'zh']
if zh_images:
zh_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return zh_images[0]
en_images = [img for img in _images if img.get('lang') == 'en']
if en_images:
en_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return en_images[0]
_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return _images[0]
return ret or None
image_obj = __pick_best_image(images)
# 设置图片,没有图片才设置
if not mediainfo.get_image(image_name):
mediainfo.set_image(image_name, image_obj.get('url'))
@staticmethod
def __mediainfo_image_attr(image_name: str) -> Optional[str]:
"""
将 Fanart 刮削图片名映射为 MediaInfo 的显式图片字段
"""
image_key = Path(image_name).stem
if image_key == "poster":
return "poster_path"
if image_key in ("background", "fanart", "backdrop"):
return "backdrop_path"
if image_key == "logo":
return "logo_path"
return None
return mediainfo
@staticmethod
def __pick_best_image(_images: list):
"""
其他图片优先环境变量指定语言再like最多
"""
lang_env = settings.FANART_LANG
if lang_env:
langs = [lang.strip() for lang in lang_env.split(",") if lang.strip()]
for lang in langs:
lang_images = [img for img in _images if img.get("lang") == lang]
if lang_images:
lang_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True)
return lang_images[0]
# 没设置或没找到,按原逻辑 zh、en、like最多
zh_images = [img for img in _images if img.get("lang") == "zh"]
if zh_images:
zh_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True)
return zh_images[0]
en_images = [img for img in _images if img.get("lang") == "en"]
if en_images:
en_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True)
return en_images[0]
_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True)
return _images[0]
@staticmethod
def __name(fanart_name: str) -> str:
"""
转换Fanart图片的名字
"""
words_to_remove = r'tv|movie|hdmovie|hdtv|show|hd'
words_to_remove = r"tv|movie|hdmovie|hdtv|show|hd"
pattern = re.compile(words_to_remove, re.IGNORECASE)
result = re.sub(pattern, '', fanart_name)
result = re.sub(pattern, "", fanart_name)
return result
@classmethod
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get")
def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]:
if media_type == MediaType.MOVIE:
image_url = cls._movie_url % queryid
else:
image_url = cls._tv_url % queryid
def __request_fanart(
cls, media_type: MediaType, queryid: Union[str, int]
) -> Optional[dict]:
image_url = cls.__fanart_url(media_type=media_type, queryid=queryid)
try:
ret = RequestUtils(proxies=cls._proxies, timeout=10).get_res(image_url, raise_exception=True)
ret = RequestUtils(proxies=cls._proxies, timeout=10).get_res(
image_url, raise_exception=True
)
if ret:
return ret.json()
else:
@@ -457,10 +578,43 @@ class FanartModule(_ModuleBase):
logger.error(f"获取{queryid}的Fanart图片失败{str(err)}")
return None
@classmethod
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get")
async def __async_request_fanart(
cls, media_type: MediaType, queryid: Union[str, int]
) -> Optional[dict]:
image_url = cls.__fanart_url(media_type=media_type, queryid=queryid)
try:
ret = await AsyncRequestUtils(proxies=cls._proxies, timeout=10).get_json(
image_url
)
if ret:
return ret
logger.debug(f"未能获取到 {queryid} 的Fanart图片")
return {}
except Exception as err:
logger.error(f"获取{queryid}的Fanart图片失败{str(err)}")
return None
@classmethod
def __fanart_url(cls, media_type: MediaType, queryid: Union[str, int]) -> str:
"""
生成 Fanart 请求地址
"""
if media_type == MediaType.MOVIE:
return cls._movie_url % queryid
return cls._tv_url % queryid
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
self.__request_fanart.cache_clear()
async_cache_clear = self.__async_request_fanart.cache_clear()
try:
loop = asyncio.get_running_loop()
loop.create_task(async_cache_clear)
except RuntimeError:
asyncio.run(async_cache_clear)
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -126,14 +126,12 @@ class TmdbScraper:
poster_name, poster_url = self.get_season_poster(seasoninfo, season)
if poster_name and poster_url:
images[poster_name] = poster_url
return images
else:
# 获取媒体信息中原有图片TheMovieDb或Fanart
# 获取媒体信息中原有图片
for attr_name, attr_value in vars(mediainfo).items():
if (
attr_value
and attr_name.endswith("_path")
and attr_value
and isinstance(attr_value, str)
and attr_value.startswith("http")
):
@@ -141,6 +139,7 @@ class TmdbScraper:
attr_name.replace("_path", "") + Path(attr_value).suffix
)
images[image_name] = attr_value
# 替换原语言Poster
if settings.TMDB_SCRAP_ORIGINAL_IMAGE:
_mediainfo = self.original_tmdb(mediainfo).get_info(
@@ -154,7 +153,7 @@ class TmdbScraper:
attr_name.replace("_path", "") + Path(image_url).suffix
)
images[image_name] = image_url
return images
return images
@staticmethod
def get_season_poster(seasoninfo: dict, season: int) -> Tuple[str, str]: