From d9508533e16e19b3e1a5683e45cba107e53d30f3 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sat, 18 Jan 2025 02:32:08 +0800 Subject: [PATCH] feat(cache): add cache region support --- app/chain/recommend.py | 72 ++++++++++------------------------------- app/core/cache.py | 34 ++++++++++--------- app/helper/subscribe.py | 14 ++++---- 3 files changed, 43 insertions(+), 77 deletions(-) diff --git a/app/chain/recommend.py b/app/chain/recommend.py index d0290625..e42b3c3c 100644 --- a/app/chain/recommend.py +++ b/app/chain/recommend.py @@ -1,18 +1,15 @@ -import inspect import io import tempfile -from functools import wraps from pathlib import Path -from typing import Any, Callable, List +from typing import Any, List from PIL import Image -from cachetools import TTLCache -from cachetools.keys import hashkey from app.chain import ChainBase from app.chain.bangumi import BangumiChain from app.chain.douban import DoubanChain from app.chain.tmdb import TmdbChain +from app.core.cache import cache_backend, cached from app.core.config import settings, global_vars from app.log import logger from app.schemas import MediaType @@ -23,42 +20,7 @@ from app.utils.singleton import Singleton # 推荐相关的专用缓存 recommend_ttl = 24 * 3600 -recommend_cache = TTLCache(maxsize=256, ttl=recommend_ttl) - - -# 推荐缓存装饰器,避免偶发网络获取数据为空时,页面由于缓存为空长时间渲染异常问题 -def cached_with_empty_check(func: Callable): - """ - 缓存装饰器,用于缓存函数的返回结果,仅在结果非空时进行缓存 - - :param func: 被装饰的函数 - :return: 包装后的函数 - """ - - @wraps(func) - def wrapper(*args, **kwargs): - signature = inspect.signature(func) - resolved_kwargs = {} - # 获取默认值并结合传递的参数(如果有) - for param, value in signature.parameters.items(): - if param in kwargs: - # 使用显式传递的参数 - resolved_kwargs[param] = kwargs[param] - elif value.default is not inspect.Parameter.empty: - # 没有传递参数时使用默认值 - resolved_kwargs[param] = value.default - # 使用 cachetools 缓存,构造缓存键 - cache_key = f"{func.__name__}_{hashkey(*args, **resolved_kwargs)}" - if cache_key in recommend_cache: - return recommend_cache[cache_key] - result = func(*args, **kwargs) - # 如果返回值为空,则不缓存 - if result in [None, [], {}]: - return result - recommend_cache[cache_key] = result - return result - - return wrapper +recommend_cache_region = "recommend" class RecommendChain(ChainBase, metaclass=Singleton): @@ -78,7 +40,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): 刷新推荐 """ logger.debug("Starting to refresh Recommend data.") - recommend_cache.clear() + cache_backend.clear(region=recommend_cache_region) logger.debug("Recommend Cache has been cleared.") # 推荐来源方法 @@ -194,7 +156,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}") @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def tmdb_movies(self, sort_by: str = "popularity.desc", with_genres: str = "", with_original_language: str = "", page: int = 1) -> Any: """ @@ -208,7 +170,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [movie.to_dict() for movie in movies] if movies else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def tmdb_tvs(self, sort_by: str = "popularity.desc", with_genres: str = "", with_original_language: str = "zh|en|ja|ko", page: int = 1) -> Any: """ @@ -222,7 +184,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [tv.to_dict() for tv in tvs] if tvs else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def tmdb_trending(self, page: int = 1) -> Any: """ TMDB流行趋势 @@ -231,7 +193,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [info.to_dict() for info in infos] if infos else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def bangumi_calendar(self, page: int = 1, count: int = 30) -> Any: """ Bangumi每日放送 @@ -240,7 +202,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_movie_showing(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣正在热映 @@ -249,7 +211,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_movies(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any: """ 豆瓣最新电影 @@ -259,7 +221,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_tvs(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any: """ 豆瓣最新电视剧 @@ -269,7 +231,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_movie_top250(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣电影TOP250 @@ -278,7 +240,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_tv_weekly_chinese(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣国产剧集榜 @@ -287,7 +249,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_tv_weekly_global(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣全球剧集榜 @@ -296,7 +258,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_tv_animation(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣热门动漫 @@ -305,7 +267,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in tvs] if tvs else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_movie_hot(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣热门电影 @@ -314,7 +276,7 @@ class RecommendChain(ChainBase, metaclass=Singleton): return [media.to_dict() for media in movies] if movies else [] @log_execution_time(logger=logger) - @cached_with_empty_check + @cached(maxsize=16, ttl=recommend_ttl, region=recommend_cache_region) def douban_tv_hot(self, page: int = 1, count: int = 30) -> Any: """ 豆瓣热门电视剧 diff --git a/app/core/cache.py b/app/core/cache.py index 817d5065..86897cf5 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -9,6 +9,9 @@ import redis from cachetools import TTLCache from cachetools.keys import hashkey +# 默认缓存区 +DEFAULT_CACHE_REGION = "DEFAULT" + class CacheBackend(ABC): """ @@ -16,7 +19,7 @@ class CacheBackend(ABC): """ @abstractmethod - def set(self, key: str, value: Any, ttl: int, region: str = "default", **kwargs) -> None: + def set(self, key: str, value: Any, ttl: int, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None: """ 设置缓存 @@ -29,7 +32,7 @@ class CacheBackend(ABC): pass @abstractmethod - def get(self, key: str, region: str = "default") -> Any: + def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any: """ 获取缓存 @@ -40,7 +43,7 @@ class CacheBackend(ABC): pass @abstractmethod - def delete(self, key: str, region: str = "default") -> None: + def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None: """ 删除缓存 @@ -59,7 +62,7 @@ class CacheBackend(ABC): pass @staticmethod - def get_region(region: str = "default"): + def get_region(region: str = DEFAULT_CACHE_REGION): """ 获取缓存的区 """ @@ -83,7 +86,7 @@ class CacheToolsBackend(CacheBackend): # 存储各个 region 的缓存实例,region -> {key -> TTLCache} self._region_caches: Dict[str, Dict[str, TTLCache]] = defaultdict(dict) - def set(self, key: str, value: Any, ttl: int = None, region: str = "default", **kwargs) -> None: + def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None: """ 设置缓存值支持每个 key 独立配置 TTL 和 Maxsize @@ -105,7 +108,7 @@ class CacheToolsBackend(CacheBackend): # 设置缓存值 cache[key] = value - def get(self, key: str, region: str = "default") -> Any: + def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any: """ 获取缓存的值 @@ -121,7 +124,7 @@ class CacheToolsBackend(CacheBackend): cache = region_cache[key] return cache.get(key) - def delete(self, key: str, region: str = "default") -> None: + def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None: """ 删除缓存 @@ -143,6 +146,7 @@ class CacheToolsBackend(CacheBackend): :param region: 缓存的区 """ if region: + region = self.get_region(region) region_cache = self._region_caches[region] for cache in region_cache.values(): cache.clear() @@ -176,7 +180,7 @@ class RedisBackend(CacheBackend): # 使用 region 作为缓存键的一部分 return f"region:{region}:key:{key}" - def set(self, key: str, value: Any, ttl: int = None, region: str = "default", **kwargs) -> None: + def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None: """ 设置缓存 @@ -190,7 +194,7 @@ class RedisBackend(CacheBackend): redis_key = self.get_redis_key(region, key) self.client.setex(redis_key, ttl, value) - def get(self, key: str, region: str = "default") -> Any: + def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any: """ 获取缓存的值 @@ -202,7 +206,7 @@ class RedisBackend(CacheBackend): value = self.client.get(redis_key) return value - def delete(self, key: str, region: str = "default") -> None: + def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None: """ 删除缓存 @@ -244,11 +248,7 @@ def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend: return CacheToolsBackend(maxsize=maxsize, ttl=ttl) -# 缓存后端实例 -cache_backend = get_cache_backend() - - -def cached(region: str = "default", maxsize: int = 1000, ttl: int = 1800, +def cached(region: str = DEFAULT_CACHE_REGION, maxsize: int = 1000, ttl: int = 1800, skip_none: bool = True, skip_empty: bool = True): """ 自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl @@ -318,3 +318,7 @@ def cached(region: str = "default", maxsize: int = 1000, ttl: int = 1800, return wrapper return decorator + + +# 缓存后端实例 +cache_backend = get_cache_backend() diff --git a/app/helper/subscribe.py b/app/helper/subscribe.py index 9ac6377a..4942712e 100644 --- a/app/helper/subscribe.py +++ b/app/helper/subscribe.py @@ -1,8 +1,7 @@ from threading import Thread from typing import List, Tuple -from cachetools import TTLCache, cached - +from app.core.cache import cached, cache_backend from app.core.config import settings from app.db.subscribe_oper import SubscribeOper from app.db.systemconfig_oper import SystemConfigOper @@ -31,7 +30,7 @@ class SubscribeHelper(metaclass=Singleton): _sub_fork = f"{settings.MP_SERVER_HOST}/subscribe/fork/%s" - _shares_cache = TTLCache(maxsize=20, ttl=1800) + _shares_cache_region = "subscribe_share" def __init__(self): self.systemconfig = SystemConfigOper() @@ -41,7 +40,7 @@ class SubscribeHelper(metaclass=Singleton): if self.sub_report(): self.systemconfig.set(SystemConfigKey.SubscribeReport, "1") - @cached(cache=TTLCache(maxsize=20, ttl=1800)) + @cached(maxsize=20, ttl=1800) def get_statistic(self, stype: str, page: int = 1, count: int = 30) -> List[dict]: """ 获取订阅统计数据 @@ -129,6 +128,7 @@ class SubscribeHelper(metaclass=Singleton): return False, "订阅不存在" subscribe_dict = subscribe.to_dict() subscribe_dict.pop("id") + cache_backend.clear(region=self._shares_cache_region) res = RequestUtils(proxies=settings.PROXY, content_type="application/json", timeout=10).post(self._sub_share, json={ @@ -142,7 +142,7 @@ class SubscribeHelper(metaclass=Singleton): return False, "连接MoviePilot服务器失败" if res.ok: # 清除 get_shares 的缓存,以便实时看到结果 - self._shares_cache.clear() + cache_backend.clear(region=self._shares_cache_region) return True, "" else: return False, res.json().get("message") @@ -160,7 +160,7 @@ class SubscribeHelper(metaclass=Singleton): return False, "连接MoviePilot服务器失败" if res.ok: # 清除 get_shares 的缓存,以便实时看到结果 - self._shares_cache.clear() + cache_backend.clear(region=self._shares_cache_region) return True, "" else: return False, res.json().get("message") @@ -181,7 +181,7 @@ class SubscribeHelper(metaclass=Singleton): else: return False, res.json().get("message") - @cached(cache=_shares_cache) + @cached(region=_shares_cache_region) def get_shares(self, name: str, page: int = 1, count: int = 30) -> List[dict]: """ 获取订阅分享数据