Files
MoviePilot/app/chain/recommend.py
2024-12-25 23:07:56 +08:00

320 lines
12 KiB
Python

import inspect
import io
import tempfile
from functools import wraps
from pathlib import Path
from typing import Any, Callable, List
from PIL import Image
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.chain import ChainBase
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
from app.core.config import settings
from app.log import logger
from app.schemas import MediaType
from app.utils.common import log_execution_time
from app.utils.http import RequestUtils
from app.utils.security import SecurityUtils
from app.utils.singleton import Singleton
# 推荐相关的专用缓存
recommend_ttl = 24 * 3600
recommend_cache = TTLCache(maxsize=256, ttl=recommend_ttl)
# 推荐缓存装饰器,避免偶发网络获取数据为空时,页面由于缓存为空长时间渲染异常问题
def cached_with_empty_check(func: Callable):
"""
缓存装饰器,用于缓存函数的返回结果,仅在结果非空时进行缓存
:param func: 被装饰的函数
:return: 包装后的函数
"""
@wraps(func)
def wrapper(*args, **kwargs):
signature = inspect.signature(func)
resolved_kwargs = {}
# 获取默认值并结合传递的参数(如果有)
for param, value in signature.parameters.items():
if param in kwargs:
# 使用显式传递的参数
resolved_kwargs[param] = kwargs[param]
elif value.default is not inspect.Parameter.empty:
# 没有传递参数时使用默认值
resolved_kwargs[param] = value.default
# 使用 cachetools 缓存,构造缓存键
cache_key = f"{func.__name__}_{hashkey(*args, **resolved_kwargs)}"
if cache_key in recommend_cache:
return recommend_cache[cache_key]
result = func(*args, **kwargs)
# 如果返回值为空,则不缓存
if result in [None, [], {}]:
return result
recommend_cache[cache_key] = result
return result
return wrapper
class RecommendChain(ChainBase, metaclass=Singleton):
"""
推荐处理链,单例运行
"""
def __init__(self):
super().__init__()
self.tmdbchain = TmdbChain()
self.doubanchain = DoubanChain()
self.bangumichain = BangumiChain()
self.cache_max_pages = 5
def refresh_recommend(self):
"""
刷新推荐
"""
logger.debug("Starting to refresh Recommend data.")
recommend_cache.clear()
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
recommend_methods = [
self.tmdb_movies,
self.tmdb_tvs,
self.tmdb_trending,
self.bangumi_calendar,
self.douban_movie_showing,
self.douban_movies,
self.douban_tvs,
self.douban_movie_top250,
self.douban_tv_weekly_chinese,
self.douban_tv_weekly_global,
self.douban_tv_animation,
self.douban_movie_hot,
self.douban_tv_hot,
]
# 缓存并刷新所有推荐数据
recommends = []
# 记录哪些方法已完成
methods_finished = set()
# 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历
for page in range(1, self.cache_max_pages + 1):
for method in recommend_methods:
if method in methods_finished:
continue
logger.debug(f"Fetch {method.__name__} data for page {page}.")
data = method(page=page)
if not data:
logger.debug("All recommendation methods have finished fetching data. Ending pagination early.")
methods_finished.add(method)
continue
recommends.extend(data)
# 如果所有方法都已经完成,提前结束循环
if len(methods_finished) == len(recommend_methods):
break
# 缓存收集到的海报
self.__cache_posters(recommends)
logger.debug("Recommend data refresh completed.")
def __cache_posters(self, datas: List[dict]):
"""
提取 poster_path 并缓存图片
:param datas: 数据列表
"""
if not settings.GLOBAL_IMAGE_CACHE:
return
for data in datas:
poster_path = data.get("poster_path")
if poster_path:
poster_url = poster_path.replace("original", "w500")
logger.debug(f"Caching poster image: {poster_url}")
self.__fetch_and_save_image(poster_url)
@staticmethod
def __fetch_and_save_image(url: str):
"""
请求并保存图片
:param url: 图片路径
"""
if not settings.GLOBAL_IMAGE_CACHE or not url:
return
# 生成缓存路径
sanitized_path = SecurityUtils.sanitize_url_path(url)
cache_path = settings.CACHE_PATH / "images" / sanitized_path
# 确保缓存路径和文件类型合法
if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES):
logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}")
return
# 本地存在缓存图片,则直接跳过
if cache_path.exists():
logger.debug(f"Cache hit: Image already exists at {cache_path}")
return
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
proxies = settings.PROXY if not referer else None
response = RequestUtils(ua=settings.USER_AGENT, proxies=proxies, referer=referer).get_res(url=url)
if not response:
logger.debug(f"Empty response for URL: {url}")
return
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {url}: {e}")
return
if not cache_path:
return
try:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(response.content)
temp_path = Path(tmp_file.name)
temp_path.replace(cache_path)
logger.debug(f"Successfully cached image at {cache_path} for URL: {url}")
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
@log_execution_time(logger=logger)
@cached_with_empty_check
def tmdb_movies(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "", page: int = 1) -> Any:
"""
TMDB热门电影
"""
movies = self.tmdbchain.tmdb_discover(mtype=MediaType.MOVIE,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [movie.to_dict() for movie in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def tmdb_tvs(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "zh|en|ja|ko", page: int = 1) -> Any:
"""
TMDB热门电视剧
"""
tvs = self.tmdbchain.tmdb_discover(mtype=MediaType.TV,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
page=page)
return [tv.to_dict() for tv in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def tmdb_trending(self, page: int = 1) -> Any:
"""
TMDB流行趋势
"""
infos = self.tmdbchain.tmdb_trending(page=page)
return [info.to_dict() for info in infos] if infos else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def bangumi_calendar(self, page: int = 1, count: int = 30) -> Any:
"""
Bangumi每日放送
"""
medias = self.bangumichain.calendar()
return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_movie_showing(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣正在热映
"""
movies = self.doubanchain.movie_showing(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_movies(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电影
"""
movies = self.doubanchain.douban_discover(mtype=MediaType.MOVIE,
sort=sort, tags=tags, page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_tvs(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电视剧
"""
tvs = self.doubanchain.douban_discover(mtype=MediaType.TV,
sort=sort, tags=tags, page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_movie_top250(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣电影TOP250
"""
movies = self.doubanchain.movie_top250(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_tv_weekly_chinese(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣国产剧集榜
"""
tvs = self.doubanchain.tv_weekly_chinese(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_tv_weekly_global(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣全球剧集榜
"""
tvs = self.doubanchain.tv_weekly_global(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_tv_animation(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门动漫
"""
tvs = self.doubanchain.tv_animation(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_movie_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电影
"""
movies = self.doubanchain.movie_hot(page=page, count=count)
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
def douban_tv_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电视剧
"""
tvs = self.doubanchain.tv_hot(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []