feat(cache): add cache region support

This commit is contained in:
InfinityPacer
2025-01-18 02:32:08 +08:00
parent 6d2059447e
commit d9508533e1
3 changed files with 43 additions and 77 deletions

View File

@@ -1,18 +1,15 @@
import inspect
import io
import tempfile
from functools import wraps
from pathlib import Path
from typing import Any, Callable, List
from typing import Any, List
from PIL import Image
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.chain import ChainBase
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
from app.core.cache import cache_backend, cached
from app.core.config import settings, global_vars
from app.log import logger
from app.schemas import MediaType
@@ -23,42 +20,7 @@ from app.utils.singleton import Singleton
# 推荐相关的专用缓存
recommend_ttl = 24 * 3600
recommend_cache = TTLCache(maxsize=256, ttl=recommend_ttl)
# 推荐缓存装饰器,避免偶发网络获取数据为空时,页面由于缓存为空长时间渲染异常问题
def cached_with_empty_check(func: Callable):
"""
缓存装饰器,用于缓存函数的返回结果,仅在结果非空时进行缓存
:param func: 被装饰的函数
:return: 包装后的函数
"""
@wraps(func)
def wrapper(*args, **kwargs):
signature = inspect.signature(func)
resolved_kwargs = {}
# 获取默认值并结合传递的参数(如果有)
for param, value in signature.parameters.items():
if param in kwargs:
# 使用显式传递的参数
resolved_kwargs[param] = kwargs[param]
elif value.default is not inspect.Parameter.empty:
# 没有传递参数时使用默认值
resolved_kwargs[param] = value.default
# 使用 cachetools 缓存,构造缓存键
cache_key = f"{func.__name__}_{hashkey(*args, **resolved_kwargs)}"
if cache_key in recommend_cache:
return recommend_cache[cache_key]
result = func(*args, **kwargs)
# 如果返回值为空,则不缓存
if result in [None, [], {}]:
return result
recommend_cache[cache_key] = result
return result
return wrapper
recommend_cache_region = "recommend"
class RecommendChain(ChainBase, metaclass=Singleton):
@@ -78,7 +40,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
刷新推荐
"""
logger.debug("Starting to refresh Recommend data.")
recommend_cache.clear()
cache_backend.clear(region=recommend_cache_region)
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
@@ -194,7 +156,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_movies(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "", page: int = 1) -> Any:
"""
@@ -208,7 +170,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [movie.to_dict() for movie in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_tvs(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "zh|en|ja|ko", page: int = 1) -> Any:
"""
@@ -222,7 +184,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [tv.to_dict() for tv in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_trending(self, page: int = 1) -> Any:
"""
TMDB流行趋势
@@ -231,7 +193,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [info.to_dict() for info in infos] if infos else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def bangumi_calendar(self, page: int = 1, count: int = 30) -> Any:
"""
Bangumi每日放送
@@ -240,7 +202,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_showing(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣正在热映
@@ -249,7 +211,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_movies(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电影
@@ -259,7 +221,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_tvs(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电视剧
@@ -269,7 +231,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_top250(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣电影TOP250
@@ -278,7 +240,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_weekly_chinese(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣国产剧集榜
@@ -287,7 +249,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_weekly_global(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣全球剧集榜
@@ -296,7 +258,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_animation(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门动漫
@@ -305,7 +267,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电影
@@ -314,7 +276,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电视剧

View File

@@ -9,6 +9,9 @@ import redis
from cachetools import TTLCache
from cachetools.keys import hashkey
# 默认缓存区
DEFAULT_CACHE_REGION = "DEFAULT"
class CacheBackend(ABC):
"""
@@ -16,7 +19,7 @@ class CacheBackend(ABC):
"""
@abstractmethod
def set(self, key: str, value: Any, ttl: int, region: str = "default", **kwargs) -> None:
def set(self, key: str, value: Any, ttl: int, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
@@ -29,7 +32,7 @@ class CacheBackend(ABC):
pass
@abstractmethod
def get(self, key: str, region: str = "default") -> Any:
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存
@@ -40,7 +43,7 @@ class CacheBackend(ABC):
pass
@abstractmethod
def delete(self, key: str, region: str = "default") -> None:
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
@@ -59,7 +62,7 @@ class CacheBackend(ABC):
pass
@staticmethod
def get_region(region: str = "default"):
def get_region(region: str = DEFAULT_CACHE_REGION):
"""
获取缓存的区
"""
@@ -83,7 +86,7 @@ class CacheToolsBackend(CacheBackend):
# 存储各个 region 的缓存实例region -> {key -> TTLCache}
self._region_caches: Dict[str, Dict[str, TTLCache]] = defaultdict(dict)
def set(self, key: str, value: Any, ttl: int = None, region: str = "default", **kwargs) -> None:
def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存值支持每个 key 独立配置 TTL 和 Maxsize
@@ -105,7 +108,7 @@ class CacheToolsBackend(CacheBackend):
# 设置缓存值
cache[key] = value
def get(self, key: str, region: str = "default") -> Any:
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存的值
@@ -121,7 +124,7 @@ class CacheToolsBackend(CacheBackend):
cache = region_cache[key]
return cache.get(key)
def delete(self, key: str, region: str = "default") -> None:
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
@@ -143,6 +146,7 @@ class CacheToolsBackend(CacheBackend):
:param region: 缓存的区
"""
if region:
region = self.get_region(region)
region_cache = self._region_caches[region]
for cache in region_cache.values():
cache.clear()
@@ -176,7 +180,7 @@ class RedisBackend(CacheBackend):
# 使用 region 作为缓存键的一部分
return f"region:{region}:key:{key}"
def set(self, key: str, value: Any, ttl: int = None, region: str = "default", **kwargs) -> None:
def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
@@ -190,7 +194,7 @@ class RedisBackend(CacheBackend):
redis_key = self.get_redis_key(region, key)
self.client.setex(redis_key, ttl, value)
def get(self, key: str, region: str = "default") -> Any:
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存的值
@@ -202,7 +206,7 @@ class RedisBackend(CacheBackend):
value = self.client.get(redis_key)
return value
def delete(self, key: str, region: str = "default") -> None:
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
@@ -244,11 +248,7 @@ def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend:
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
# 缓存后端实例
cache_backend = get_cache_backend()
def cached(region: str = "default", maxsize: int = 1000, ttl: int = 1800,
def cached(region: str = DEFAULT_CACHE_REGION, maxsize: int = 1000, ttl: int = 1800,
skip_none: bool = True, skip_empty: bool = True):
"""
自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl
@@ -318,3 +318,7 @@ def cached(region: str = "default", maxsize: int = 1000, ttl: int = 1800,
return wrapper
return decorator
# 缓存后端实例
cache_backend = get_cache_backend()

View File

@@ -1,8 +1,7 @@
from threading import Thread
from typing import List, Tuple
from cachetools import TTLCache, cached
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.subscribe_oper import SubscribeOper
from app.db.systemconfig_oper import SystemConfigOper
@@ -31,7 +30,7 @@ class SubscribeHelper(metaclass=Singleton):
_sub_fork = f"{settings.MP_SERVER_HOST}/subscribe/fork/%s"
_shares_cache = TTLCache(maxsize=20, ttl=1800)
_shares_cache_region = "subscribe_share"
def __init__(self):
self.systemconfig = SystemConfigOper()
@@ -41,7 +40,7 @@ class SubscribeHelper(metaclass=Singleton):
if self.sub_report():
self.systemconfig.set(SystemConfigKey.SubscribeReport, "1")
@cached(cache=TTLCache(maxsize=20, ttl=1800))
@cached(maxsize=20, ttl=1800)
def get_statistic(self, stype: str, page: int = 1, count: int = 30) -> List[dict]:
"""
获取订阅统计数据
@@ -129,6 +128,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "订阅不存在"
subscribe_dict = subscribe.to_dict()
subscribe_dict.pop("id")
cache_backend.clear(region=self._shares_cache_region)
res = RequestUtils(proxies=settings.PROXY, content_type="application/json",
timeout=10).post(self._sub_share,
json={
@@ -142,7 +142,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
self._shares_cache.clear()
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
@@ -160,7 +160,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
self._shares_cache.clear()
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
@@ -181,7 +181,7 @@ class SubscribeHelper(metaclass=Singleton):
else:
return False, res.json().get("message")
@cached(cache=_shares_cache)
@cached(region=_shares_cache_region)
def get_shares(self, name: str, page: int = 1, count: int = 30) -> List[dict]:
"""
获取订阅分享数据