diff --git a/app/core/cache.py b/app/core/cache.py index ea627d3e..0a1262e4 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -472,6 +472,128 @@ class MemoryBackend(CacheBackend): pass +class AsyncMemoryBackend(AsyncCacheBackend): + """ + 基于 `cachetools.TTLCache` 实现的异步缓存后端 + """ + + def __init__(self, maxsize: Optional[int] = None, ttl: Optional[int] = None): + """ + 初始化缓存实例 + + :param maxsize: 缓存的最大条目数 + :param ttl: 默认缓存存活时间,单位秒 + """ + self.maxsize = maxsize or 1024 # 未设置时默认最大条目数为 1024 + self.ttl = ttl + # 存储各个 region 的缓存实例,region -> TTLCache + self._region_caches: Dict[str, MemoryTTLCache] = {} + + def __get_region_cache(self, region: str) -> Optional[MemoryTTLCache]: + """ + 获取指定区域的缓存实例,如果不存在则返回 None + """ + region = self.get_region(region) + return self._region_caches.get(region) + + async def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + """ + 设置缓存值支持每个 key 独立配置 TTL + + :param key: 缓存的键 + :param value: 缓存的值 + :param ttl: 缓存的存活时间,不传入为永久缓存,单位秒 + :param region: 缓存的区 + """ + ttl = ttl or self.ttl + maxsize = kwargs.get("maxsize", self.maxsize) + region = self.get_region(region) + # 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例 + region_cache = self._region_caches.setdefault(region, MemoryTTLCache(maxsize=maxsize, ttl=ttl)) + # 设置缓存值 + with lock: + region_cache[key] = value + + async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回 True,否则返回 False + """ + region_cache = self.__get_region_cache(region) + if region_cache is None: + return False + return key in region_cache + + async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any: + """ + 获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回 None + """ + region_cache = self.__get_region_cache(region) + if region_cache is None: + return None + return region_cache.get(key) + + async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION): + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + region_cache = self.__get_region_cache(region) + if region_cache is None: + return + with lock: + del region_cache[key] + + async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区,为None时清空所有区缓存 + """ + if region: + # 清理指定缓存区 + region_cache = self.__get_region_cache(region) + if region_cache: + with lock: + region_cache.clear() + logger.info(f"Cleared cache for region: {region}") + else: + # 清除所有区域的缓存 + for region_cache in self._region_caches.values(): + with lock: + region_cache.clear() + logger.info("Cleared all cache") + + async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + region_cache = self.__get_region_cache(region) + if region_cache is None: + return + for item in region_cache.items(): + yield item + + async def close(self) -> None: + """ + 内存缓存不需要关闭资源 + """ + pass + + class RedisBackend(CacheBackend): """ 基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存 @@ -900,6 +1022,21 @@ def Cache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> CacheBack return MemoryBackend(maxsize=maxsize, ttl=ttl) +def AsyncCache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> AsyncCacheBackend: + """ + 根据配置获取异步缓存后端实例(内存或Redis),maxsize仅在未启用Redis时生效 + + :param maxsize: 缓存的最大条目数,仅使用cachetools时生效 + :param ttl: 缓存的默认存活时间,单位秒 + :return: 返回异步缓存后端实例 + """ + if settings.CACHE_BACKEND_TYPE == "redis": + return AsyncRedisBackend(ttl=ttl) + else: + # 使用异步内存缓存,maxsize需要有值 + return AsyncMemoryBackend(maxsize=maxsize, ttl=ttl) + + def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None, skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False): """ @@ -912,39 +1049,62 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt :param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False :return: 装饰器函数 """ - # 缓存后端实例 - cache_backend = Cache(maxsize=maxsize, ttl=ttl) - - def should_cache(value: Any) -> bool: - """ - 判断是否应该缓存结果,如果返回值是 None 或空值则不缓存 - - :param value: 要判断的缓存值 - :return: 是否缓存结果 - """ - if skip_none and value is None: - return False - # if skip_empty and value in [None, [], {}, "", set()]: - if skip_empty and not value: - return False - return True - - def is_valid_cache_value(cache_key: str, cached_value: Any, cache_region: str) -> bool: - """ - 判断指定的值是否为一个有效的缓存值 - - :param cache_key: 缓存的键 - :param cached_value: 缓存的值 - :param cache_region: 缓存的区 - :return: 若值是有效的缓存值返回 True,否则返回 False - """ - # 如果 skip_none 为 False,且 value 为 None,需要判断缓存实际是否存在 - if not skip_none and cached_value is None: - if not cache_backend.exists(key=cache_key, region=cache_region): - return False - return True def decorator(func): + # 检查是否为异步函数 + is_async = inspect.iscoroutinefunction(func) + + # 根据函数类型选择对应的缓存后端 + if is_async: + # 异步函数使用异步缓存后端 + cache_backend = AsyncCache(maxsize=maxsize, ttl=ttl) + else: + # 同步函数使用同步缓存后端 + cache_backend = Cache(maxsize=maxsize, ttl=ttl) + + def should_cache(value: Any) -> bool: + """ + 判断是否应该缓存结果,如果返回值是 None 或空值则不缓存 + + :param value: 要判断的缓存值 + :return: 是否缓存结果 + """ + if skip_none and value is None: + return False + # if skip_empty and value in [None, [], {}, "", set()]: + if skip_empty and not value: + return False + return True + + def is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool: + """ + 判断指定的值是否为一个有效的缓存值 + + :param _cache_key: 缓存的键 + :param _cached_value: 缓存的值 + :param _cache_region: 缓存的区 + :return: 若值是有效的缓存值返回 True,否则返回 False + """ + # 如果 skip_none 为 False,且 value 为 None,需要判断缓存实际是否存在 + if not skip_none and _cached_value is None: + if not cache_backend.exists(key=_cache_key, region=_cache_region): + return False + return True + + async def async_is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool: + """ + 判断指定的值是否为一个有效的缓存值(异步版本) + + :param _cache_key: 缓存的键 + :param _cached_value: 缓存的值 + :param _cache_region: 缓存的区 + :return: 若值是有效的缓存值返回 True,否则返回 False + """ + # 如果 skip_none 为 False,且 value 为 None,需要判断缓存实际是否存在 + if not skip_none and _cached_value is None: + if not await cache_backend.exists(key=_cache_key, region=_cache_region): + return False + return True def __get_cache_key(args, kwargs) -> str: """ @@ -982,8 +1142,9 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt # 获取缓存键 cache_key = __get_cache_key(args, kwargs) # 尝试获取缓存 - cached_value = cache_backend.get(cache_key, region=cache_region) - if should_cache(cached_value) and is_valid_cache_value(cache_key, cached_value, cache_region): + cached_value = await cache_backend.get(cache_key, region=cache_region) + if should_cache(cached_value) and await async_is_valid_cache_value(cache_key, cached_value, + cache_region): return cached_value # 执行异步函数并缓存结果 result = await func(*args, **kwargs) @@ -991,14 +1152,14 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt if not should_cache(result): return result # 设置缓存(如果有传入的 maxsize 和 ttl,则覆盖默认值) - cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region) + await cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region) return result - def cache_clear(): + async def cache_clear(): """ 清理缓存区 """ - cache_backend.clear(region=cache_region) + await cache_backend.clear(region=cache_region) async_wrapper.cache_region = cache_region async_wrapper.cache_clear = cache_clear @@ -1130,7 +1291,7 @@ class TTLCache(CacheProxy): 使用项目的缓存后端实现,支持 Redis 和内存缓存 """ - def __init__(self, maxsize: int = 1024, ttl: int = 600, region: Optional[str] = None): + def __init__(self, maxsize: int = 1024, ttl: int = None, region: Optional[str] = None): """ 初始化 TTL 缓存 @@ -1147,7 +1308,7 @@ class LRUCache(CacheProxy): 使用项目的缓存后端实现,支持 Redis 和内存缓存 """ - def __init__(self, maxsize: int = 128, region: Optional[str] = None): + def __init__(self, maxsize: int = 1024, region: Optional[str] = None): """ 初始化 LRU 缓存 diff --git a/app/helper/redis.py b/app/helper/redis.py index a74a0be6..cd1d6a91 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -140,18 +140,18 @@ class RedisHelper(metaclass=Singleton): logger.error(f"Failed to set Redis maxmemory or policy: {e}") @staticmethod - def __get_region(region: Optional[str] = "DEFAULT"): + def __get_region(region: Optional[str] = None): """ 获取缓存的区 """ - return f"region:{region}" if region else "region:default" + return f"region:{quote(region)}" if region else "region:DEFAULT" def __make_redis_key(self, region: str, key: str) -> str: """ 获取缓存Key """ # 使用region作为缓存键的一部分 - region = self.__get_region(quote(region)) + region = self.__get_region(region) return f"{region}:key:{quote(key)}" @staticmethod @@ -247,7 +247,7 @@ class RedisHelper(metaclass=Singleton): try: self._connect() if region: - cache_region = self.__get_region(quote(region)) + cache_region = self.__get_region(region) redis_key = f"{cache_region}:key:*" with self.client.pipeline() as pipe: for key in self.client.scan_iter(redis_key): @@ -270,7 +270,7 @@ class RedisHelper(metaclass=Singleton): try: self._connect() if region: - cache_region = self.__get_region(quote(region)) + cache_region = self.__get_region(region) redis_key = f"{cache_region}:key:*" for key in self.client.scan_iter(redis_key): value = self.client.get(key) @@ -392,7 +392,7 @@ class AsyncRedisHelper(metaclass=Singleton): 获取缓存Key """ # 使用region作为缓存键的一部分 - region = self.__get_region(quote(region)) + region = self.__get_region(region) return f"{region}:key:{quote(key)}" @staticmethod @@ -489,7 +489,7 @@ class AsyncRedisHelper(metaclass=Singleton): try: await self._connect() if region: - cache_region = self.__get_region(quote(region)) + cache_region = self.__get_region(region) redis_key = f"{cache_region}:key:*" async with self.client.pipeline() as pipe: async for key in self.client.scan_iter(redis_key): @@ -512,7 +512,7 @@ class AsyncRedisHelper(metaclass=Singleton): try: await self._connect() if region: - cache_region = self.__get_region(quote(region)) + cache_region = self.__get_region(region) redis_key = f"{cache_region}:key:*" async for key in self.client.scan_iter(redis_key): value = await self.client.get(key)