fix async cache

This commit is contained in:
jxxghp
2025-08-23 18:34:47 +08:00
parent 31f342fe4f
commit 7cc3777a60
2 changed files with 207 additions and 46 deletions

View File

@@ -472,6 +472,128 @@ class MemoryBackend(CacheBackend):
pass
class AsyncMemoryBackend(AsyncCacheBackend):
"""
基于 `cachetools.TTLCache` 实现的异步缓存后端
"""
def __init__(self, maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
初始化缓存实例
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.maxsize = maxsize or 1024 # 未设置时默认最大条目数为 1024
self.ttl = ttl
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, MemoryTTLCache] = {}
def __get_region_cache(self, region: str) -> Optional[MemoryTTLCache]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
region = self.get_region(region)
return self._region_caches.get(region)
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存值支持每个 key 独立配置 TTL
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,不传入为永久缓存,单位秒
:param region: 缓存的区
"""
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(region, MemoryTTLCache(maxsize=maxsize, ttl=ttl))
# 设置缓存值
with lock:
region_cache[key] = value
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return False
return key in region_cache
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return region_cache.get(key)
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION):
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
del region_cache[key]
async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区为None时清空所有区缓存
"""
if region:
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
region_cache.clear()
logger.info(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
region_cache.clear()
logger.info("Cleared all cache")
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
for item in region_cache.items():
yield item
async def close(self) -> None:
"""
内存缓存不需要关闭资源
"""
pass
class RedisBackend(CacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
@@ -900,6 +1022,21 @@ def Cache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> CacheBack
return MemoryBackend(maxsize=maxsize, ttl=ttl)
def AsyncCache(maxsize: Optional[int] = None, ttl: Optional[int] = None) -> AsyncCacheBackend:
"""
根据配置获取异步缓存后端实例内存或Redismaxsize仅在未启用Redis时生效
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回异步缓存后端实例
"""
if settings.CACHE_BACKEND_TYPE == "redis":
return AsyncRedisBackend(ttl=ttl)
else:
# 使用异步内存缓存maxsize需要有值
return AsyncMemoryBackend(maxsize=maxsize, ttl=ttl)
def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None,
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False):
"""
@@ -912,39 +1049,62 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:return: 装饰器函数
"""
# 缓存后端实例
cache_backend = Cache(maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
判断是否应该缓存结果,如果返回值是 None 或空值则不缓存
:param value: 要判断的缓存值
:return: 是否缓存结果
"""
if skip_none and value is None:
return False
# if skip_empty and value in [None, [], {}, "", set()]:
if skip_empty and not value:
return False
return True
def is_valid_cache_value(cache_key: str, cached_value: Any, cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值
:param cache_key: 缓存的键
:param cached_value: 缓存的值
:param cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and cached_value is None:
if not cache_backend.exists(key=cache_key, region=cache_region):
return False
return True
def decorator(func):
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 根据函数类型选择对应的缓存后端
if is_async:
# 异步函数使用异步缓存后端
cache_backend = AsyncCache(maxsize=maxsize, ttl=ttl)
else:
# 同步函数使用同步缓存后端
cache_backend = Cache(maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
判断是否应该缓存结果,如果返回值是 None 或空值则不缓存
:param value: 要判断的缓存值
:return: 是否缓存结果
"""
if skip_none and value is None:
return False
# if skip_empty and value in [None, [], {}, "", set()]:
if skip_empty and not value:
return False
return True
def is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值
:param _cache_key: 缓存的键
:param _cached_value: 缓存的值
:param _cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and _cached_value is None:
if not cache_backend.exists(key=_cache_key, region=_cache_region):
return False
return True
async def async_is_valid_cache_value(_cache_key: str, _cached_value: Any, _cache_region: str) -> bool:
"""
判断指定的值是否为一个有效的缓存值(异步版本)
:param _cache_key: 缓存的键
:param _cached_value: 缓存的值
:param _cache_region: 缓存的区
:return: 若值是有效的缓存值返回 True否则返回 False
"""
# 如果 skip_none 为 False且 value 为 None需要判断缓存实际是否存在
if not skip_none and _cached_value is None:
if not await cache_backend.exists(key=_cache_key, region=_cache_region):
return False
return True
def __get_cache_key(args, kwargs) -> str:
"""
@@ -982,8 +1142,9 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
# 获取缓存键
cache_key = __get_cache_key(args, kwargs)
# 尝试获取缓存
cached_value = cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value) and is_valid_cache_value(cache_key, cached_value, cache_region):
cached_value = await cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value) and await async_is_valid_cache_value(cache_key, cached_value,
cache_region):
return cached_value
# 执行异步函数并缓存结果
result = await func(*args, **kwargs)
@@ -991,14 +1152,14 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
if not should_cache(result):
return result
# 设置缓存(如果有传入的 maxsize 和 ttl则覆盖默认值
cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region)
await cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region)
return result
def cache_clear():
async def cache_clear():
"""
清理缓存区
"""
cache_backend.clear(region=cache_region)
await cache_backend.clear(region=cache_region)
async_wrapper.cache_region = cache_region
async_wrapper.cache_clear = cache_clear
@@ -1130,7 +1291,7 @@ class TTLCache(CacheProxy):
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 1024, ttl: int = 600, region: Optional[str] = None):
def __init__(self, maxsize: int = 1024, ttl: int = None, region: Optional[str] = None):
"""
初始化 TTL 缓存
@@ -1147,7 +1308,7 @@ class LRUCache(CacheProxy):
使用项目的缓存后端实现,支持 Redis 和内存缓存
"""
def __init__(self, maxsize: int = 128, region: Optional[str] = None):
def __init__(self, maxsize: int = 1024, region: Optional[str] = None):
"""
初始化 LRU 缓存

View File

@@ -140,18 +140,18 @@ class RedisHelper(metaclass=Singleton):
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def __get_region(region: Optional[str] = "DEFAULT"):
def __get_region(region: Optional[str] = None):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
return f"region:{quote(region)}" if region else "region:DEFAULT"
def __make_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.__get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
@@ -247,7 +247,7 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.__get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
@@ -270,7 +270,7 @@ class RedisHelper(metaclass=Singleton):
try:
self._connect()
if region:
cache_region = self.__get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
for key in self.client.scan_iter(redis_key):
value = self.client.get(key)
@@ -392,7 +392,7 @@ class AsyncRedisHelper(metaclass=Singleton):
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.__get_region(quote(region))
region = self.__get_region(region)
return f"{region}:key:{quote(key)}"
@staticmethod
@@ -489,7 +489,7 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.__get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async with self.client.pipeline() as pipe:
async for key in self.client.scan_iter(redis_key):
@@ -512,7 +512,7 @@ class AsyncRedisHelper(metaclass=Singleton):
try:
await self._connect()
if region:
cache_region = self.__get_region(quote(region))
cache_region = self.__get_region(region)
redis_key = f"{cache_region}:key:*"
async for key in self.client.scan_iter(redis_key):
value = await self.client.get(key)