Merge pull request #5486 from cddjr/feat/shared-sync-async-cache

This commit is contained in:
jxxghp
2026-02-10 22:11:42 +08:00
committed by GitHub
10 changed files with 100 additions and 100 deletions

View File

@@ -27,8 +27,6 @@ DEFAULT_CACHE_SIZE = 1024
# 默认缓存有效期
DEFAULT_CACHE_TTL = 365 * 24 * 60 * 60
lock = threading.Lock()
# 上下文变量来控制缓存行为
_fresh = contextvars.ContextVar('fresh', default=False)
@@ -297,14 +295,14 @@ class AsyncCacheBackend(CacheBackend):
"""
获取所有缓存键,类似 dict.keys()(异步)
"""
async for key, _ in await self.items(region=region):
async for key, _ in self.items(region=region):
yield key
async def values(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Any, None]:
"""
获取所有缓存值,类似 dict.values()(异步)
"""
async for _, value in await self.items(region=region):
async for _, value in self.items(region=region):
yield value
async def update(self, other: Dict[str, Any], region: Optional[str] = DEFAULT_CACHE_REGION,
@@ -332,7 +330,7 @@ class AsyncCacheBackend(CacheBackend):
弹出最后一个缓存项,类似 dict.popitem()(异步)
"""
items = []
async for item in await self.items(region=region):
async for item in self.items(region=region):
items.append(item)
if not items:
raise KeyError("popitem(): cache is empty")
@@ -364,6 +362,11 @@ class MemoryBackend(CacheBackend):
基于 `cachetools.TTLCache` 实现的缓存后端
"""
# 类变量 _region_caches 的互斥锁
_lock = threading.Lock()
# 存储各个 region 的缓存实例region -> TTLCache
_region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __init__(self, cache_type: Literal['ttl', 'lru'] = 'ttl',
maxsize: Optional[int] = None, ttl: Optional[int] = None):
"""
@@ -376,8 +379,6 @@ class MemoryBackend(CacheBackend):
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
@@ -400,7 +401,7 @@ class MemoryBackend(CacheBackend):
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 设置缓存值
with lock:
with self._lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
@@ -445,7 +446,7 @@ class MemoryBackend(CacheBackend):
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
with self._lock:
del region_cache[key]
def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
@@ -458,13 +459,13 @@ class MemoryBackend(CacheBackend):
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
with self._lock:
region_cache.clear()
logger.debug(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
with self._lock:
region_cache.clear()
logger.info("Cleared all cache")
@@ -480,7 +481,7 @@ class MemoryBackend(CacheBackend):
yield from ()
return
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
with self._lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
@@ -507,18 +508,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.cache_type = cache_type
self.maxsize = maxsize or DEFAULT_CACHE_SIZE
self.ttl = ttl or DEFAULT_CACHE_TTL
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, Union[MemoryTTLCache, MemoryLRUCache]] = {}
def __get_region_cache(self, region: str) -> Optional[Union[MemoryTTLCache, MemoryLRUCache]]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
region = self.get_region(region)
return self._region_caches.get(region)
self._backend = MemoryBackend(cache_type=cache_type, maxsize=maxsize, ttl=ttl)
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
@@ -530,18 +520,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param ttl: 缓存的存活时间,不传入为永久缓存,单位秒
:param region: 缓存的区
"""
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 设置缓存值
with lock:
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(
region,
MemoryTTLCache(maxsize=maxsize, ttl=ttl) if self.cache_type == 'ttl'
else MemoryLRUCache(maxsize=maxsize)
)
region_cache[key] = value
return self._backend.set(key=key, value=value, ttl=ttl, region=region, **kwargs)
async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
@@ -551,10 +530,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return False
return key in region_cache
return self._backend.exists(key=key, region=region)
async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any:
"""
@@ -564,10 +540,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return region_cache.get(key)
return self._backend.get(key=key, region=region)
async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION):
"""
@@ -576,11 +549,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param key: 缓存的键
:param region: 缓存的区
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
with lock:
del region_cache[key]
return self._backend.delete(key=key, region=region)
async def clear(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
@@ -588,19 +557,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区为None时清空所有区缓存
"""
if region:
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
with lock:
region_cache.clear()
logger.debug(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
with lock:
region_cache.clear()
logger.info("All cache cleared")
return self._backend.clear(region=region)
async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> AsyncGenerator[Tuple[str, Any], None]:
"""
@@ -609,14 +566,7 @@ class AsyncMemoryBackend(AsyncCacheBackend):
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return
# 使用锁保护迭代过程,避免在迭代时缓存被修改
with lock:
# 创建快照避免并发修改问题
items_snapshot = list(region_cache.items())
for item in items_snapshot:
for item in self._backend.items(region):
yield item
async def close(self) -> None:
@@ -1115,15 +1065,16 @@ def AsyncCache(cache_type: Literal['ttl', 'lru'] = 'ttl',
def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Optional[int] = None,
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False):
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False, shared_key: Optional[str] = None):
"""
自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl
:param region: 缓存
:param maxsize: 缓存的最大条目数
:param region: 缓存区域的标识符,默认根据模块名、函数名等自动生成标识
:param maxsize: 缓存区内的最大条目数
:param ttl: 缓存的存活时间,单位秒,未传入则为永久缓存,单位秒
:param skip_none: 跳过 None 缓存,默认为 True
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:param shared_key: 同步/异步函数共享缓存的键,默认使用函数名(异步函数名会标准化为同步格式,如移除 `async_` 前缀)
:return: 装饰器函数
"""
@@ -1173,6 +1124,17 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
return False
return True
def __standardize_func_name() -> str:
"""
将异步函数名标准化为同步函数的命名,以生成统一的缓存键
"""
# XXX 假设异步函数名与同步版本仅差`async_`前缀或`_async`后缀当前MP代码大多符合否则需通过`shared_key`参数显式指定
return (
func.__name__.removeprefix("async_").removesuffix("_async")
if is_async
else func.__name__
)
def __get_cache_key(args, kwargs) -> str:
"""
根据函数和参数生成缓存键
@@ -1194,13 +1156,22 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 1024, ttl: Opt
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
# 获取缓存区
cache_region = region if region is not None else f"{func.__module__}.{func.__name__}"
return f"{func_name}_{hashkey(*keys)}"
# 被装饰函数的上层名称(如类名或外层函数名)
enclosing_name = (
func.__qualname__[:last_dot]
if (last_dot := func.__qualname__.rfind(".")) != -1
else ""
)
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 生成标准化后的函数名称,用于同步/异步函数共享缓存
func_name = shared_key if shared_key else __standardize_func_name()
# 获取缓存区
cache_region = (
region if region is not None else f"{func.__module__}:{enclosing_name}:{func_name}"
)
if is_async:
# 异步函数使用异步缓存后端

View File

@@ -290,3 +290,11 @@ class BangumiModule(_ModuleBase):
if infos:
return [MediaInfo(bangumi_info=info) for info in infos]
return []
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
self.bangumiapi.clear_cache()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -31,7 +31,7 @@ class BangumiApi(object):
self._req = RequestUtils(ua=settings.NORMAL_USER_AGENT, session=self._session)
self._async_req = AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get")
def __invoke(self, url, key: Optional[str] = None, **kwargs):
req_url = self._base_url + url
params = {}
@@ -47,7 +47,7 @@ class BangumiApi(object):
print(e)
return None
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.bangumi, ttl=settings.CONF.meta, shared_key="get")
async def __async_invoke(self, url, key: Optional[str] = None, **kwargs):
req_url = self._base_url + url
params = {}
@@ -300,6 +300,12 @@ class BangumiApi(object):
key="data",
_ts=datetime.strftime(datetime.now(), '%Y%m%d'), **kwargs)
def clear_cache(self):
"""
清除缓存
"""
self.__invoke.cache_clear()
def close(self):
if self._session:
self._session.close()

View File

@@ -154,7 +154,6 @@ class DoubanApi(metaclass=WeakSingleton):
_api_url = "https://api.douban.com/v2"
def __init__(self):
self.__clear_async_cache__ = False
self._session = requests.Session()
@classmethod
@@ -225,7 +224,7 @@ class DoubanApi(metaclass=WeakSingleton):
"""
return resp.json() if resp is not None else None
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get")
def __invoke(self, url: str, **kwargs) -> dict:
"""
GET请求
@@ -237,14 +236,11 @@ class DoubanApi(metaclass=WeakSingleton):
).get_res(url=req_url, params=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="get")
async def __async_invoke(self, url: str, **kwargs) -> dict:
"""
GET请求异步版本
"""
if self.__clear_async_cache__:
self.__clear_async_cache__ = False
await self.__async_invoke.cache_clear()
req_url, params = self._prepare_get_request(url, **kwargs)
resp = await AsyncRequestUtils(
ua=choice(self._user_agents)
@@ -263,7 +259,7 @@ class DoubanApi(metaclass=WeakSingleton):
params.pop('_ts')
return req_url, params
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post")
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求
@@ -285,7 +281,7 @@ class DoubanApi(metaclass=WeakSingleton):
).post_res(url=req_url, data=params)
return self._handle_response(resp)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True)
@cached(maxsize=settings.CONF.douban, ttl=settings.CONF.meta, skip_none=True, shared_key="post")
async def __async_post(self, url: str, **kwargs) -> dict:
"""
POST请求异步版本
@@ -865,7 +861,7 @@ class DoubanApi(metaclass=WeakSingleton):
清空LRU缓存
"""
self.__invoke.cache_clear()
self.__clear_async_cache__ = True
self.__post.cache_clear()
def close(self):
if self._session:

View File

@@ -440,7 +440,7 @@ class FanartModule(_ModuleBase):
return result
@classmethod
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta)
@cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get")
def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]:
if media_type == MediaType.MOVIE:
image_url = cls._movie_url % queryid
@@ -456,3 +456,11 @@ class FanartModule(_ModuleBase):
except Exception as err:
logger.error(f"获取{queryid}的Fanart图片失败{str(err)}")
return None
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
self.__request_fanart.cache_clear()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -29,7 +29,7 @@ class TNodeSpider(metaclass=SingletonClass):
self._ua = indexer.get('ua')
self._timeout = indexer.get('timeout') or 15
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True)
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token")
def __get_token(self) -> Optional[str]:
if not self._domain:
return
@@ -43,7 +43,7 @@ class TNodeSpider(metaclass=SingletonClass):
return csrf_token.group(1)
return None
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True)
@cached(region="indexer_spider", maxsize=1, ttl=60 * 60 * 24, skip_empty=True, shared_key="get_token")
async def __async_get_token(self) -> Optional[str]:
if not self._domain:
return

View File

@@ -1625,6 +1625,9 @@ class TmdbApi:
"""
清除缓存
"""
self.match_web.cache_clear()
self.discover.discover_movies.cache_clear()
self.discover.discover_tv_shows.cache_clear()
self.tmdb.cache_clear()
# 私有异步方法

View File

@@ -40,8 +40,6 @@ class TMDb(object):
self._reset = None
self._timeout = 15
self.__clear_async_cache__ = False
@property
def page(self):
return self._page
@@ -129,7 +127,6 @@ class TMDb(object):
return req
def cache_clear(self):
self.__clear_async_cache__ = True
return self.request.cache_clear()
def _validate_api_key(self):
@@ -200,7 +197,7 @@ class TMDb(object):
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
time.sleep(rate_limit_result)
return self._request_obj(action, params, call_cached, method, data, json, key)
return self._request_obj(action, params, False, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=False)
@@ -215,10 +212,6 @@ class TMDb(object):
self._validate_api_key()
url = self._build_url(action, params)
if self.__clear_async_cache__:
self.__clear_async_cache__ = False
await self.async_request.cache_clear()
async with async_fresh(not call_cached or method == "POST"):
req = await self.async_request(method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d'))
@@ -232,7 +225,7 @@ class TMDb(object):
if rate_limit_result:
logger.warning("达到请求频率限制,将在 %d 秒后重试..." % rate_limit_result)
await asyncio.sleep(rate_limit_result)
return await self._async_request_obj(action, params, call_cached, method, data, json, key)
return await self._async_request_obj(action, params, False, method, data, json, key)
json_data = req.json()
self._process_json_response(json_data, is_async=True)

View File

@@ -162,3 +162,12 @@ class TheTvDbModule(_ModuleBase):
except Exception as err:
logger.error(f"用标题搜索TVDB剧集失败 ({title}): {str(err)}")
return []
def clear_cache(self):
"""
清除缓存
"""
logger.info(f"开始清除{self.get_name()}缓存 ...")
if tvdb := self.tvdb:
tvdb.clear_cache()
logger.info(f"{self.get_name()}缓存清除完成")

View File

@@ -618,3 +618,9 @@ class TVDB:
"""
url = self.url.construct('user/favorites')
return self.request.make_request(url)
def clear_cache(self):
"""
清除缓存
"""
self.request.make_request.cache_clear()