diff --git a/app/core/cache.py b/app/core/cache.py index 2bb1f1e9..444488e7 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -27,8 +27,6 @@ DEFAULT_CACHE_SIZE = 1024 # 默认缓存有效期 DEFAULT_CACHE_TTL = 365 * 24 * 60 * 60 -lock = threading.Lock() - # 上下文变量来控制缓存行为 _fresh = contextvars.ContextVar('fresh', default=False) @@ -297,14 +295,14 @@ class AsyncCacheBackend(CacheBackend): """ 获取所有缓存键,类似 dict.keys()(异步) """ - async for key, _ in await self.items(region=region): + async for key, _ in self.items(region=region): yield key async def values(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Any, None]: """ 获取所有缓存值,类似 dict.values()(异步) """ - async for _, value in await self.items(region=region): + async for _, value in self.items(region=region): yield value async def update(self, other: Dict[str, Any], region: Optional[str] = DEFAULT_CACHE_REGION, @@ -332,7 +330,7 @@ class AsyncCacheBackend(CacheBackend): 弹出最后一个缓存项,类似 dict.popitem()(异步) """ items = [] - async for item in await self.items(region=region): + async for item in self.items(region=region): items.append(item) if not items: raise KeyError("popitem(): cache is empty") @@ -364,6 +362,11 @@ class MemoryBackend(CacheBackend): 基于 `cachetools.TTLCache` 实现的缓存后端 """ + # 类变量 _region_caches 的互斥锁 + _lock = threading.Lock() + # 存储各个 region 的缓存实例,region -> TTLCache + _region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {} + def __init__(self, cache_type: Literal['ttl', 'lru'] = 'ttl', maxsize: Optional[int] = None, ttl: Optional[int] = None): """ @@ -376,8 +379,6 @@ class MemoryBackend(CacheBackend): self.cache_type = cache_type self.maxsize = maxsize or DEFAULT_CACHE_SIZE self.ttl = ttl or DEFAULT_CACHE_TTL - # 存储各个 region 的缓存实例,region -> TTLCache - self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {} def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]: """ @@ -400,7 +401,7 @@ class MemoryBackend(CacheBackend): maxsize = kwargs.get("maxsize", self.maxsize) region = self.get_region(region) # 设置缓存值 - with lock: + with self._lock: # 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例 region_cache = self._region_caches.setdefault( region, @@ -445,7 +446,7 @@ class MemoryBackend(CacheBackend): region_cache = self.__get_region_cache(region) if region_cache is None: return - with lock: + with self._lock: del region_cache[key] def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: @@ -458,13 +459,13 @@ class MemoryBackend(CacheBackend): # 清理指定缓存区 region_cache = self.__get_region_cache(region) if region_cache: - with lock: + with self._lock: region_cache.clear() logger.debug(f"Cleared cache for region: {region}") else: # 清除所有区域的缓存 for region_cache in self._region_caches.values(): - with lock: + with self._lock: region_cache.clear() logger.info("Cleared all cache") @@ -480,7 +481,7 @@ class MemoryBackend(CacheBackend): yield from () return # 使用锁保护迭代过程,避免在迭代时缓存被修改 - with lock: + with self._lock: # 创建快照避免并发修改问题 items_snapshot = list(region_cache.items()) for item in items_snapshot: @@ -507,18 +508,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param maxsize: 缓存的最大条目数 :param ttl: 默认缓存存活时间,单位秒 """ - self.cache_type = cache_type - self.maxsize = maxsize or DEFAULT_CACHE_SIZE - self.ttl = ttl or DEFAULT_CACHE_TTL - # 存储各个 region 的缓存实例,region -> TTLCache - self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {} - - def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]: - """ - 获取指定区域的缓存实例,如果不存在则返回 None - """ - region = self.get_region(region) - return self._region_caches.get(region) + self._backend = MemoryBackend(cache_type=cache_type, maxsize=maxsize, ttl=ttl) async def set(self, key: str, value: Any, ttl: Optional[int] = None, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: @@ -530,18 +520,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param ttl: 缓存的存活时间,不传入为永久缓存,单位秒 :param region: 缓存的区 """ - ttl = ttl or self.ttl - maxsize = kwargs.get("maxsize", self.maxsize) - region = self.get_region(region) - # 设置缓存值 - with lock: - # 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例 - region_cache = self._region_caches.setdefault( - region, - MemoryTTLCache(maxsize=maxsize, ttl=ttl) if self.cache_type == 'ttl' - else MemoryLRUCache(maxsize=maxsize) - ) - region_cache[key] = value + return self._backend.set(key=key, value=value, ttl=ttl, region=region, **kwargs) async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: """ @@ -551,10 +530,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param region: 缓存的区 :return: 存在返回 True,否则返回 False """ - region_cache = self.__get_region_cache(region) - if region_cache is None: - return False - return key in region_cache + return self._backend.exists(key=key, region=region) async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any: """ @@ -564,10 +540,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param region: 缓存的区 :return: 返回缓存的值,如果缓存不存在返回 None """ - region_cache = self.__get_region_cache(region) - if region_cache is None: - return None - return region_cache.get(key) + return self._backend.get(key=key, region=region) async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION): """ @@ -576,11 +549,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param key: 缓存的键 :param region: 缓存的区 """ - region_cache = self.__get_region_cache(region) - if region_cache is None: - return - with lock: - del region_cache[key] + return self._backend.delete(key=key, region=region) async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: """ @@ -588,19 +557,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param region: 缓存的区,为None时清空所有区缓存 """ - if region: - # 清理指定缓存区 - region_cache = self.__get_region_cache(region) - if region_cache: - with lock: - region_cache.clear() - logger.debug(f"Cleared cache for region: {region}") - else: - # 清除所有区域的缓存 - for region_cache in self._region_caches.values(): - with lock: - region_cache.clear() - logger.info("All cache cleared!") + return self._backend.clear(region=region) async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]: """ @@ -609,14 +566,7 @@ class AsyncMemoryBackend(AsyncCacheBackend): :param region: 缓存的区 :return: 返回一个字典,包含所有缓存键值对 """ - region_cache = self.__get_region_cache(region) - if region_cache is None: - return - # 使用锁保护迭代过程,避免在迭代时缓存被修改 - with lock: - # 创建快照避免并发修改问题 - items_snapshot = list(region_cache.items()) - for item in items_snapshot: + for item in self._backend.items(region): yield item async def close(self) -> None: @@ -1115,15 +1065,16 @@ def AsyncCache(cache_type: Literal['ttl', 'lru'] = 'ttl', def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None, - skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False): + skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False, shared_key: Optional[str] = None): """ 自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl - :param region: 缓存的区 - :param maxsize: 缓存的最大条目数 + :param region: 缓存区域的标识符,默认根据模块名、函数名等自动生成标识 + :param maxsize: 缓存区内的最大条目数 :param ttl: 缓存的存活时间,单位秒,未传入则为永久缓存,单位秒 :param skip_none: 跳过 None 缓存,默认为 True :param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False + :param shared_key: 同步/异步函数共享缓存的键,默认使用函数名(异步函数名会标准化为同步格式,如移除 `async_` 前缀) :return: 装饰器函数 """ @@ -1173,6 +1124,17 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt return False return True + def __standardize_func_name() -> str: + """ + 将异步函数名标准化为同步函数的命名,以生成统一的缓存键 + """ + # XXX 假设异步函数名与同步版本仅差`async_`前缀或`_async`后缀(当前MP代码大多符合),否则需通过`shared_key`参数显式指定 + return ( + func.__name__.removeprefix("async_").removesuffix("_async") + if is_async + else func.__name__ + ) + def __get_cache_key(args, kwargs) -> str: """ 根据函数和参数生成缓存键 @@ -1194,13 +1156,22 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt bound.arguments[param] for param in signature.parameters if param in bound.arguments ] # 使用有序参数生成缓存键 - return f"{func.__name__}_{hashkey(*keys)}" - - # 获取缓存区 - cache_region = region if region is not None else f"{func.__module__}.{func.__name__}" + return f"{func_name}_{hashkey(*keys)}" + # 被装饰函数的上层名称(如类名或外层函数名) + enclosing_name = ( + func.__qualname__[:last_dot] + if (last_dot := func.__qualname__.rfind(".")) != -1 + else "" + ) # 检查是否为异步函数 is_async = inspect.iscoroutinefunction(func) + # 生成标准化后的函数名称,用于同步/异步函数共享缓存 + func_name = shared_key if shared_key else __standardize_func_name() + # 获取缓存区 + cache_region = ( + region if region is not None else f"{func.__module__}:{enclosing_name}:{func_name}" + ) if is_async: # 异步函数使用异步缓存后端 diff --git a/app/modules/bangumi/__init__.py b/app/modules/bangumi/__init__.py index 4721c35f..db8f6e6a 100644 --- a/app/modules/bangumi/__init__.py +++ b/app/modules/bangumi/__init__.py @@ -290,3 +290,11 @@ class BangumiModule(_ModuleBase): if infos: return [MediaInfo(bangumi_info=info) for info in infos] return [] + + def clear_cache(self): + """ + 清除缓存 + """ + logger.info(f"开始清除{self.get_name()}缓存 ...") + self.bangumiapi.clear_cache() + logger.info(f"{self.get_name()}缓存清除完成") diff --git a/app/modules/bangumi/bangumi.py b/app/modules/bangumi/bangumi.py index 020f8bc6..467addbe 100644 --- a/app/modules/bangumi/bangumi.py +++ b/app/modules/bangumi/bangumi.py @@ -31,7 +31,7 @@ class BangumiApi(object): self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session) self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT) - @cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta) + @cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get") def __invoke(self, url, key: Optional[str] = None, **kwargs): req_url = self._base_url + url params = {} @@ -47,7 +47,7 @@ class BangumiApi(object): print(e) return None - @cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta) + @cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get") async def __async_invoke(self, url, key: Optional[str] = None, **kwargs): req_url = self._base_url + url params = {} @@ -300,6 +300,12 @@ class BangumiApi(object): key="data", _ts=datetime.strftime(datetime.now(), '%Y%m%d'), **kwargs) + def clear_cache(self): + """ + 清除缓存 + """ + self.__invoke.cache_clear() + def close(self): if self._session: self._session.close() diff --git a/app/modules/douban/apiv2.py b/app/modules/douban/apiv2.py index 7a3586d2..69d7019b 100644 --- a/app/modules/douban/apiv2.py +++ b/app/modules/douban/apiv2.py @@ -154,7 +154,6 @@ class DoubanApi(metaclass=WeakSingleton): _api_url = "https://api.douban.com/v2" def __init__(self): - self.__clear_async_cache__ = False self._session = requests.Session() @classmethod @@ -225,7 +224,7 @@ class DoubanApi(metaclass=WeakSingleton): """ return resp.json() if resp is not None else None - @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True) + @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get") def __invoke(self, url: str, **kwargs) -> dict: """ GET请求 @@ -237,14 +236,11 @@ class DoubanApi(metaclass=WeakSingleton): ).get_res(url=req_url, params=params) return self._handle_response(resp) - @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True) + @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get") async def __async_invoke(self, url: str, **kwargs) -> dict: """ GET请求(异步版本) """ - if self.__clear_async_cache__: - self.__clear_async_cache__ = False - await self.__async_invoke.cache_clear() req_url, params = self._prepare_get_request(url, **kwargs) resp = await AsyncRequestUtils( ua=choice(self._user_agents) @@ -263,7 +259,7 @@ class DoubanApi(metaclass=WeakSingleton): params.pop('_ts') return req_url, params - @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True) + @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post") def __post(self, url: str, **kwargs) -> dict: """ POST请求 @@ -285,7 +281,7 @@ class DoubanApi(metaclass=WeakSingleton): ).post_res(url=req_url, data=params) return self._handle_response(resp) - @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True) + @cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post") async def __async_post(self, url: str, **kwargs) -> dict: """ POST请求(异步版本) @@ -865,7 +861,7 @@ class DoubanApi(metaclass=WeakSingleton): 清空LRU缓存 """ self.__invoke.cache_clear() - self.__clear_async_cache__ = True + self.__post.cache_clear() def close(self): if self._session: diff --git a/app/modules/fanart/__init__.py b/app/modules/fanart/__init__.py index 764b8106..8522ce6d 100644 --- a/app/modules/fanart/__init__.py +++ b/app/modules/fanart/__init__.py @@ -440,7 +440,7 @@ class FanartModule(_ModuleBase): return result @classmethod - @cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta) + @cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get") def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]: if media_type == MediaType.MOVIE: image_url = cls._movie_url % queryid @@ -456,3 +456,11 @@ class FanartModule(_ModuleBase): except Exception as err: logger.error(f"获取{queryid}的Fanart图片失败:{str(err)}") return None + + def clear_cache(self): + """ + 清除缓存 + """ + logger.info(f"开始清除{self.get_name()}缓存 ...") + self.__request_fanart.cache_clear() + logger.info(f"{self.get_name()}缓存清除完成") diff --git a/app/modules/indexer/spider/tnode.py b/app/modules/indexer/spider/tnode.py index c5303ca9..90c3390e 100644 --- a/app/modules/indexer/spider/tnode.py +++ b/app/modules/indexer/spider/tnode.py @@ -29,7 +29,7 @@ class TNodeSpider(metaclass=SingletonClass): self._ua = indexer.get('ua') self._timeout = indexer.get('timeout') or 15 - @cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True) + @cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token") def __get_token(self) -> Optional[str]: if not self._domain: return @@ -43,7 +43,7 @@ class TNodeSpider(metaclass=SingletonClass): return csrf_token.group(1) return None - @cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True) + @cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token") async def __async_get_token(self) -> Optional[str]: if not self._domain: return diff --git a/app/modules/themoviedb/tmdbapi.py b/app/modules/themoviedb/tmdbapi.py index 2a02d777..d0301835 100644 --- a/app/modules/themoviedb/tmdbapi.py +++ b/app/modules/themoviedb/tmdbapi.py @@ -1625,6 +1625,9 @@ class TmdbApi: """ 清除缓存 """ + self.match_web.cache_clear() + self.discover.discover_movies.cache_clear() + self.discover.discover_tv_shows.cache_clear() self.tmdb.cache_clear() # 私有异步方法 diff --git a/app/modules/themoviedb/tmdbv3api/tmdb.py b/app/modules/themoviedb/tmdbv3api/tmdb.py index 33652b4c..e98a07d2 100644 --- a/app/modules/themoviedb/tmdbv3api/tmdb.py +++ b/app/modules/themoviedb/tmdbv3api/tmdb.py @@ -40,8 +40,6 @@ class TMDb(object): self._reset = None self._timeout = 15 - self.__clear_async_cache__ = False - @property def page(self): return self._page @@ -129,7 +127,6 @@ class TMDb(object): return req def cache_clear(self): - self.__clear_async_cache__ = True return self.request.cache_clear() def _validate_api_key(self): @@ -200,7 +197,7 @@ class TMDb(object): if rate_limit_result: logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result) time.sleep(rate_limit_result) - return self._request_obj(action, params, call_cached, method, data, json, key) + return self._request_obj(action, params, False, method, data, json, key) json_data = req.json() self._process_json_response(json_data, is_async=False) @@ -215,10 +212,6 @@ class TMDb(object): self._validate_api_key() url = self._build_url(action, params) - if self.__clear_async_cache__: - self.__clear_async_cache__ = False - await self.async_request.cache_clear() - async with async_fresh(not call_cached or method == "POST"): req = await self.async_request(method, url, data, json, _ts=datetime.strftime(datetime.now(), '%Y%m%d')) @@ -232,7 +225,7 @@ class TMDb(object): if rate_limit_result: logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result) await asyncio.sleep(rate_limit_result) - return await self._async_request_obj(action, params, call_cached, method, data, json, key) + return await self._async_request_obj(action, params, False, method, data, json, key) json_data = req.json() self._process_json_response(json_data, is_async=True) diff --git a/app/modules/thetvdb/__init__.py b/app/modules/thetvdb/__init__.py index 29958488..de4e2711 100644 --- a/app/modules/thetvdb/__init__.py +++ b/app/modules/thetvdb/__init__.py @@ -162,3 +162,12 @@ class TheTvDbModule(_ModuleBase): except Exception as err: logger.error(f"用标题搜索TVDB剧集失败 ({title}): {str(err)}") return [] + + def clear_cache(self): + """ + 清除缓存 + """ + logger.info(f"开始清除{self.get_name()}缓存 ...") + if tvdb := self.tvdb: + tvdb.clear_cache() + logger.info(f"{self.get_name()}缓存清除完成") diff --git a/app/modules/thetvdb/tvdb_v4_official.py b/app/modules/thetvdb/tvdb_v4_official.py index 443696c3..f6e6a5b7 100644 --- a/app/modules/thetvdb/tvdb_v4_official.py +++ b/app/modules/thetvdb/tvdb_v4_official.py @@ -618,3 +618,9 @@ class TVDB: """ url = self.url.construct('user/favorites') return self.request.make_request(url) + + def clear_cache(self): + """ + 清除缓存 + """ + self.request.make_request.cache_clear()