diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index a15f50ea..ad60ae00 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -4,6 +4,7 @@ import json import re from collections import deque from datetime import datetime +from pathlib import Path from typing import Optional, Union, Annotated import aiofiles @@ -16,6 +17,7 @@ from fastapi.responses import StreamingResponse from app import schemas from app.chain.search import SearchChain from app.chain.system import SystemChain +from app.core.cache import get_async_file_cache_backend from app.core.config import global_vars, settings from app.core.event import eventmanager from app.core.metainfo import MetaInfo @@ -28,7 +30,6 @@ from app.db.user_oper import get_current_active_superuser, get_current_active_su from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper -from app.helper.redis import AsyncRedisHelper from app.helper.rule import RuleHelper from app.helper.sites import SitesHelper # noqa # noqa from app.helper.subscribe import SubscribeHelper @@ -67,53 +68,28 @@ async def fetch_image( # 缓存路径 sanitized_path = SecurityUtils.sanitize_url_path(url) - base_path = AsyncPath(settings.CACHE_PATH) - cache_path = base_path / "images" / sanitized_path + cache_path = Path("images") / sanitized_path + if not cache_path.suffix: + # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 + cache_path = cache_path.with_suffix(".jpg") + + # 缓存对像 + cache_backend = get_async_file_cache_backend(base=settings.CACHE_PATH) if use_cache: - if settings.CACHE_BACKEND_TYPE == "redis": - # 使用Redis缓存 - redis_helper = AsyncRedisHelper() - content = await redis_helper.get(sanitized_path, region="image_cache") - if content: - # 检查 If-None-Match - etag = HashUtils.md5(content) - if if_none_match == etag: - headers = RequestUtils.generate_cache_headers() - return Response(status_code=304, headers=headers) - # 返回缓存图片 - headers = RequestUtils.generate_cache_headers(etag) - return Response( - content=content, - media_type=UrlUtils.get_mime_type(url, "image/jpeg"), - headers=headers - ) - else: - # 使用磁盘缓存 - if not cache_path.suffix: - # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 - cache_path = cache_path.with_suffix(".jpg") - - # 确保缓存路径和文件类型合法 - if not await SecurityUtils.async_is_safe_path(base_path=base_path, - user_path=cache_path, - allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES): - raise HTTPException(status_code=400, detail="Invalid cache path or file type") - - # 目前暂不考虑磁盘缓存文件是否过期,通过缓存清理机制处理 - if cache_path and await cache_path.exists(): - try: - # 读取磁盘缓存图片返回 - async with aiofiles.open(cache_path, 'rb') as f: - content = await f.read() - etag = HashUtils.md5(content) - headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7) - if if_none_match == etag: - return Response(status_code=304, headers=headers) - return Response(content=content, media_type="image/jpeg", headers=headers) - except Exception as e: - # 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理 - logger.debug(f"Failed to read cache file {cache_path}: {e}") + content = await cache_backend.get(cache_path.as_posix(), region="images") + if content: + # 检查 If-None-Match + etag = HashUtils.md5(content) + headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7) + if if_none_match == etag: + return Response(status_code=304, headers=headers) + # 返回缓存图片 + return Response( + content=content, + media_type=UrlUtils.get_mime_type(url, "image/jpeg"), + headers=headers + ) # 请求远程图片 referer = "https://movie.douban.com/" if "doubanio.com" in url else None @@ -138,21 +114,8 @@ async def fetch_image( # 保存缓存 if use_cache: - if settings.CACHE_BACKEND_TYPE == "redis": - # 保存到Redis缓存 - redis_helper = AsyncRedisHelper() - await redis_helper.set(sanitized_path, content, region="image_cache") - else: - # 保存到磁盘缓存 - try: - if not await cache_path.parent.exists(): - await cache_path.parent.mkdir(parents=True, exist_ok=True) - async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: - await tmp_file.write(content) - temp_path = AsyncPath(tmp_file.name) - await temp_path.replace(cache_path) - except Exception as e: - logger.debug(f"Failed to write cache file {cache_path}: {e}") + await cache_backend.set(cache_path.as_posix(), content, region="images") + logger.debug(f"Image cached at {cache_path.as_posix()}") # 检查 If-None-Match etag = HashUtils.md5(content) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index 06dfb1cf..f5899060 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -7,13 +7,11 @@ from collections.abc import Callable from pathlib import Path from typing import Optional, Any, Tuple, List, Set, Union, Dict -import aiofiles -from anyio import Path as AsyncPath from fastapi.concurrency import run_in_threadpool from qbittorrentapi import TorrentFilesList from transmission_rpc import File -from app.core.cache import get_cache_backend +from app.core.cache import get_file_cache_backend, get_async_file_cache_backend from app.core.config import settings from app.core.context import Context, MediaInfo, TorrentInfo from app.core.event import EventManager @@ -48,137 +46,66 @@ class ChainBase(metaclass=ABCMeta): send_callback=self.run_module ) self.pluginmanager = PluginManager() - # 文件类缓存,保留1 - self._cache = get_cache_backend(ttl=30 * 24 * 3600) + self.filecache = get_file_cache_backend() + self.async_filecache = get_async_file_cache_backend() def load_cache(self, filename: str) -> Any: """ - 加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) + 加载缓存 """ - # 如果Redis可用,优先从Redis读取 - if self._cache.is_redis(): - try: - cache_data = self._cache.get(filename, region="chain_cache") - if cache_data is not None: - logger.debug(f"从Redis加载缓存: {filename}") - return cache_data - except Exception as e: - logger.warning(f"从Redis加载缓存 {filename} 失败: {e}") - - # 从本地文件读取(兼容存量数据) - cache_path = settings.TEMP_PATH / filename - if cache_path.exists(): - try: - with open(cache_path, 'rb') as f: - return pickle.load(f) - except Exception as err: - logger.error(f"加载缓存 {filename} 出错:{str(err)}") - return None + content = self.filecache.get(filename) + if not content: + return None + try: + return pickle.loads(content) + except Exception as err: + logger.error(f"加载缓存 {filename} 出错:{str(err)}") + return None async def async_load_cache(self, filename: str) -> Any: """ - 异步加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) + 异步加载缓存 """ - # 如果Redis可用,优先从Redis读取 - if self._cache.is_redis(): - try: - cache_data = self._cache.get(filename, region="chain_cache") - if cache_data is not None: - logger.debug(f"从Redis异步加载缓存: {filename}") - return cache_data - except Exception as e: - logger.warning(f"从Redis异步加载缓存 {filename} 失败: {e}") - - # 从本地文件读取(兼容存量数据) - cache_path = settings.TEMP_PATH / filename - if cache_path.exists(): - try: - async with aiofiles.open(cache_path, 'rb') as f: - content = await f.read() - return pickle.loads(content) - except Exception as err: - logger.error(f"异步加载缓存 {filename} 出错:{str(err)}") - return None + content = await self.async_filecache.get(filename) + if not content: + return None + try: + return pickle.loads(content) + except Exception as err: + logger.error(f"异步加载缓存 {filename} 出错:{str(err)}") + return None async def async_save_cache(self, cache: Any, filename: str) -> None: """ - 异步保存缓存,优先保存到Redis,同时保存到本地作为备份 + 异步保存缓存 """ - # 如果Redis可用,优先保存到Redis - if self._cache.is_redis(): - try: - self._cache.set(filename, cache, region="chain_cache") - logger.debug(f"异步保存缓存到Redis: {filename}") - except Exception as e: - logger.warning(f"异步保存缓存到Redis失败: {e}") - else: - # 保存到本地 - try: - async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f: - await f.write(pickle.dumps(cache)) - except Exception as err: - logger.error(f"异步保存缓存到本地 {filename} 出错:{str(err)}") + try: + await self.async_filecache.set(filename, pickle.dumps(cache)) + except Exception as err: + logger.error(f"异步保存缓存 {filename} 出错:{str(err)}") + return def save_cache(self, cache: Any, filename: str) -> None: """ - 保存缓存,优先保存到Redis,同时保存到本地作为备份 + 保存缓存 """ - # 如果Redis可用,优先保存到Redis - if self._cache.is_redis(): - try: - self._cache.set(filename, cache, region="chain_cache") - logger.debug(f"保存缓存到Redis: {filename}") - except Exception as e: - logger.warning(f"保存缓存到Redis失败: {e}") - else: - # 保存到本地 - try: - with open(settings.TEMP_PATH / filename, 'wb') as f: - pickle.dump(cache, f) # noqa - except Exception as err: - logger.error(f"保存缓存到本地 {filename} 出错:{str(err)}") + try: + self.filecache.set(filename, pickle.dumps(cache)) + except Exception as err: + logger.error(f"保存缓存 {filename} 出错:{str(err)}") + return def remove_cache(self, filename: str) -> None: """ 删除缓存,同时删除Redis和本地缓存 """ - # 如果Redis可用,删除Redis缓存 - if self._cache.is_redis(): - try: - self._cache.delete(filename, region="chain_cache") - logger.debug(f"删除Redis缓存: {filename}") - except Exception as e: - logger.warning(f"删除Redis缓存失败: {e}") - - # 删除本地缓存 - cache_path = settings.TEMP_PATH / filename - if cache_path.exists(): - try: - cache_path.unlink() - logger.debug(f"删除本地缓存: {filename}") - except Exception as e: - logger.warning(f"删除本地缓存失败: {e}") + self.filecache.delete(filename) async def async_remove_cache(self, filename: str) -> None: """ 异步删除缓存,同时删除Redis和本地缓存 """ - # 如果Redis可用,删除Redis缓存 - if self._cache.is_redis(): - try: - self._cache.delete(filename, region="chain_cache") - logger.debug(f"异步删除Redis缓存: {filename}") - except Exception as e: - logger.warning(f"异步删除Redis缓存失败: {e}") - - # 删除本地缓存 - cache_path = AsyncPath(settings.TEMP_PATH) / filename - if await cache_path.exists(): - try: - await cache_path.unlink() - logger.debug(f"异步删除本地缓存: {filename}") - except Exception as err: - logger.error(f"异步删除本地缓存 {filename} 出错:{str(err)}") + pass @staticmethod def __is_valid_empty(ret): diff --git a/app/chain/recommend.py b/app/chain/recommend.py index cb59cf5e..cde78336 100644 --- a/app/chain/recommend.py +++ b/app/chain/recommend.py @@ -1,5 +1,4 @@ import io -import tempfile from pathlib import Path from typing import List, Optional @@ -10,7 +9,7 @@ from app.chain import ChainBase from app.chain.bangumi import BangumiChain from app.chain.douban import DoubanChain from app.chain.tmdb import TmdbChain -from app.core.cache import cached +from app.core.cache import cached, get_file_cache_backend from app.core.config import settings, global_vars from app.log import logger from app.schemas import MediaType @@ -37,8 +36,6 @@ class RecommendChain(ChainBase, metaclass=Singleton): 刷新推荐 """ logger.debug("Starting to refresh Recommend data.") - self._cache.clear(region=self.recommend_cache_region) - logger.debug("Recommend Cache has been cleared.") # 推荐来源方法 recommend_methods = [ @@ -100,33 +97,26 @@ class RecommendChain(ChainBase, metaclass=Singleton): logger.debug(f"Caching poster image: {poster_url}") self.__fetch_and_save_image(poster_url) - def __fetch_and_save_image(self, url: str): + @staticmethod + def __fetch_and_save_image(url: str): """ 请求并保存图片 :param url: 图片路径 """ # 生成缓存路径 sanitized_path = SecurityUtils.sanitize_url_path(url) - cache_path = settings.CACHE_PATH / "images" / sanitized_path + cache_path = Path("images") / sanitized_path + # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 + if not cache_path.suffix: + cache_path = cache_path.with_suffix(".jpg") - if self._cache.is_redis(): - if self._cache.get(sanitized_path, region=self.recommend_cache_region): - logger.debug(f"Cache hit: Image already exists for URL: {url}") - return - else: - # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 - if not cache_path.suffix: - cache_path = cache_path.with_suffix(".jpg") + # 获取缓存后端 + cache_backend = get_file_cache_backend(base=settings.CACHE_PATH) - # 确保缓存路径和文件类型合法 - if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES): - logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}") - return - - # 本地存在缓存图片,则直接跳过 - if cache_path.exists(): - logger.debug(f"Cache hit: Image already exists at {cache_path}") - return + # 本地存在缓存图片,则直接跳过 + if cache_backend.get(cache_path.as_posix(), region="images"): + logger.debug(f"Cache hit: Image already exists at {cache_path}") + return # 请求远程图片 referer = "https://movie.douban.com/" if "doubanio.com" in url else None @@ -142,25 +132,10 @@ class RecommendChain(ChainBase, metaclass=Singleton): except Exception as e: logger.debug(f"Invalid image format for URL {url}: {e}") return - if self._cache.is_redis(): - # 如果是Redis缓存,直接存储到缓存中 - try: - self._cache.set(sanitized_path, response.content, region=self.recommend_cache_region) - logger.debug(f"Successfully cached image for URL: {url} in Redis.") - except Exception as e: - logger.debug(f"Failed to cache image for URL {url} in Redis: {e}") - else: - # 如果是本地文件缓存,写入到指定路径 - try: - if not cache_path.parent.exists(): - cache_path.parent.mkdir(parents=True, exist_ok=True) - with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: - tmp_file.write(response.content) - temp_path = Path(tmp_file.name) - temp_path.replace(cache_path) - logger.debug(f"Successfully cached image at {cache_path} for URL: {url}") - except Exception as e: - logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}") + + # 保存缓存 + cache_backend.set(cache_path.as_posix(), response.content, region="images") + logger.debug(f"Successfully cached image at {cache_path} for URL: {url}") @log_execution_time(logger=logger) @cached(ttl=recommend_ttl, region=recommend_cache_region) diff --git a/app/core/cache.py b/app/core/cache.py index e1bb6087..6111b3e1 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -1,14 +1,20 @@ import inspect +import shutil +import tempfile import threading from abc import ABC, abstractmethod from functools import wraps +from pathlib import Path from typing import Any, Dict, Optional +import aiofiles +import aioshutil +from anyio import Path as AsyncPath from cachetools import TTLCache as CacheToolsTTLCache from cachetools.keys import hashkey from app.core.config import settings -from app.helper.redis import RedisHelper +from app.helper.redis import RedisHelper, AsyncRedisHelper from app.log import logger # 默认缓存区 @@ -130,17 +136,122 @@ class CacheBackend(ABC): return settings.CACHE_BACKEND_TYPE == "redis" +class AsyncCacheBackend(ABC): + """ + 缓存后端基类,定义通用的缓存接口(异步) + """ + + @abstractmethod + async def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + """ + 设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param ttl: 缓存的存活时间,单位秒 + :param region: 缓存的区 + :param kwargs: 其他参数 + """ + pass + + @abstractmethod + async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回 True,否则返回 False + """ + pass + + @abstractmethod + async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Any: + """ + 获取缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回 None + """ + pass + + @abstractmethod + async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + pass + + @abstractmethod + async def clear(self, region: Optional[str] = None) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + pass + + @abstractmethod + async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + pass + + @abstractmethod + async def close(self) -> None: + """ + 关闭缓存连接 + """ + pass + + @staticmethod + def get_region(region: Optional[str] = DEFAULT_CACHE_REGION): + """ + 获取缓存的区 + """ + return f"region:{region}" if region else "region:default" + + @staticmethod + def get_cache_key(func, args, kwargs): + """ + 获取缓存的键,通过哈希函数对函数的参数进行处理 + :param func: 被装饰的函数 + :param args: 位置参数 + :param kwargs: 关键字参数 + :return: 缓存键 + """ + signature = inspect.signature(func) + # 绑定传入的参数并应用默认值 + bound = signature.bind(*args, **kwargs) + bound.apply_defaults() + # 忽略第一个参数,如果它是实例(self)或类(cls) + parameters = list(signature.parameters.keys()) + if parameters and parameters[0] in ("self", "cls"): + bound.arguments.pop(parameters[0], None) + # 按照函数签名顺序提取参数值列表 + keys = [ + bound.arguments[param] for param in signature.parameters if param in bound.arguments + ] + # 使用有序参数生成缓存键 + return f"{func.__name__}_{hashkey(*keys)}" + + @staticmethod + def is_redis() -> bool: + return settings.CACHE_BACKEND_TYPE == "redis" + + class CacheToolsBackend(CacheBackend): """ 基于 `cachetools.TTLCache` 实现的缓存后端 - - 特性: - - 支持动态设置缓存的 TTL(Time To Live,存活时间)和最大条目数(Maxsize) - - 缓存实例按区域(region)划分,不同 region 拥有独立的缓存实例 - - 同一 region 共享相同的 TTL 和 Maxsize,设置时只能作用于整个 region - - 限制: - - 不支持按 `key` 独立隔离 TTL 和 Maxsize,仅支持作用于 region 级别 """ def __init__(self, maxsize: Optional[int] = 1024, ttl: Optional[int] = 1800): @@ -263,18 +374,9 @@ class CacheToolsBackend(CacheBackend): class RedisBackend(CacheBackend): """ 基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存 - - 特性: - - 支持动态设置缓存的 TTL(Time To Live,存活时间) - - 支持分区域(region)管理缓存,不同的 region 采用独立的命名空间 - - 支持自定义最大内存限制(maxmemory)和内存淘汰策略(如 allkeys-lru) - - 限制: - - 由于 Redis 的分布式特性,写入和读取可能受到网络延迟的影响 - - Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存 """ - def __init__(self, ttl: Optional[int] = 1800): + def __init__(self, ttl: Optional[int] = None): """ 初始化 Redis 缓存实例 @@ -350,19 +452,342 @@ class RedisBackend(CacheBackend): self.redis_helper.close() +class AsyncRedisBackend(AsyncCacheBackend): + """ + 基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存 + """ + + def __init__(self, ttl: Optional[int] = None): + """ + 初始化 Redis 缓存实例 + + :param ttl: 缓存的存活时间,单位秒 + """ + self.ttl = ttl + self.redis_helper = AsyncRedisHelper() + + async def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + """ + 设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param ttl: 缓存的存活时间,单位秒如果未传入则使用默认值 + :param region: 缓存的区 + :param kwargs: kwargs + """ + ttl = ttl or self.ttl + await self.redis_helper.set(key, value, ttl=ttl, region=region, **kwargs) + + async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回 True,否则返回 False + """ + return await self.redis_helper.exists(key, region=region) + + async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]: + """ + 获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回 None + """ + return await self.redis_helper.get(key, region=region) + + async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + await self.redis_helper.delete(key, region=region) + + async def clear(self, region: Optional[str] = None) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + await self.redis_helper.clear(region=region) + + async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + return await self.redis_helper.items(region=region) + + async def close(self) -> None: + """ + 关闭 Redis 客户端的连接池 + """ + await self.redis_helper.close() + + +class FileBackend(CacheBackend): + """ + 基于 文件系统 实现的缓存后端 + """ + + def __init__(self, base: Path): + """ + 初始化文件缓存实例 + """ + self.base = base + if not self.base.exists(): + self.base.mkdir(parents=True, exist_ok=True) + + def set(self, key: str, value: Any, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + """ + 设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param region: 缓存的区 + :param kwargs: kwargs + """ + cache_path = self.base / region / key + # 确保缓存目录存在 + cache_path.parent.mkdir(parents=True, exist_ok=True) + # 将值序列化为字符串存储 + with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + tmp_file.write(value) + temp_path = Path(tmp_file.name) + temp_path.replace(cache_path) + + def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回 True,否则返回 False + """ + cache_path = self.base / key + return cache_path.exists() + + def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]: + """ + 获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回 None + """ + cache_path = self.base / region / key + if not cache_path.exists(): + return None + with open(cache_path, 'rb') as f: + return f.read() + + def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + cache_path = self.base / region / key + if cache_path.exists(): + cache_path.unlink() + + def clear(self, region: Optional[str] = None) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + if region: + # 清理指定缓存区 + cache_path = self.base / region + if cache_path.exists(): + for item in cache_path.iterdir(): + if item.is_file(): + item.unlink() + else: + shutil.rmtree(item, ignore_errors=True) + else: + # 清除所有区域的缓存 + for item in self.base.iterdir(): + if item.is_file(): + item.unlink() + else: + shutil.rmtree(item, ignore_errors=True) + + def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + cache_path = self.base / region + if not cache_path.exists(): + return {} + for item in cache_path.iterdir(): + if item.is_file(): + with open(item, 'r') as f: + yield f.read() + + def close(self) -> None: + """ + 关闭 Redis 客户端的连接池 + """ + pass + + +class AsyncFileBackend(AsyncCacheBackend): + """ + 基于 文件系统 实现的缓存后端(异步模式) + """ + + def __init__(self, base: Path): + """ + 初始化文件缓存实例 + """ + self.base = base + if not self.base.exists(): + self.base.mkdir(parents=True, exist_ok=True) + + async def set(self, key: str, value: Any, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + """ + 设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param region: 缓存的区 + :param kwargs: kwargs + """ + cache_path = AsyncPath(self.base) / region / key + # 确保缓存目录存在 + await cache_path.parent.mkdir(parents=True, exist_ok=True) + # 保存文件 + async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + await tmp_file.write(value) + temp_path = AsyncPath(tmp_file.name) + await temp_path.replace(cache_path) + + async def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool: + """ + 判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回 True,否则返回 False + """ + cache_path = AsyncPath(self.base) / region / key + return await cache_path.exists() + + async def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]: + """ + 获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回 None + """ + cache_path = AsyncPath(self.base) / region / key + if not await cache_path.exists(): + return None + async with aiofiles.open(cache_path, 'rb') as f: + return await f.read() + + async def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None: + """ + 删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + cache_path = AsyncPath(self.base) / region / key + if await cache_path.exists(): + await cache_path.unlink() + + async def clear(self, region: Optional[str] = None) -> None: + """ + 清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + if region: + # 清理指定缓存区 + cache_path = AsyncPath(self.base) / region + if await cache_path.exists(): + for item in cache_path.iterdir(): + if await item.is_file(): + await item.unlink() + else: + await aioshutil.rmtree(item, ignore_errors=True) + else: + # 清除所有区域的缓存 + for item in AsyncPath(self.base).iterdir(): + if await item.is_file(): + await item.unlink() + else: + await aioshutil.rmtree(item, ignore_errors=True) + + async def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + cache_path = AsyncPath(self.base) / region + if not await cache_path.exists(): + yield None + for item in cache_path.iterdir(): + if await item.is_file(): + async with aiofiles.open(item, 'r') as f: + yield await f.read() + + async def close(self) -> None: + """ + 关闭 Redis 客户端的连接池 + """ + pass + + +def get_file_cache_backend(base: Path = settings.TEMP_PATH) -> CacheBackend: + """ + 获取文件缓存后端实例(Redis或文件系统) + """ + if settings.CACHE_BACKEND_TYPE == "redis": + return RedisBackend() + else: + return FileBackend(base=base) + + +def get_async_file_cache_backend(base: Path = settings.TEMP_PATH) -> AsyncCacheBackend: + """ + 获取文件异步缓存后端实例(Redis或文件系统) + """ + if settings.CACHE_BACKEND_TYPE == "redis": + return AsyncRedisBackend() + else: + return AsyncFileBackend(base=base) + + def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) -> CacheBackend: """ - 根据配置获取缓存后端实例 + 根据配置获取缓存后端实例(内存或Redis) :param maxsize: 缓存的最大条目数,仅使用cachetools时生效 :param ttl: 缓存的默认存活时间,单位秒 :return: 返回缓存后端实例 """ if settings.CACHE_BACKEND_TYPE == "redis": - logger.debug(f"Attempting to use RedisBackend, TTL: {ttl}") return RedisBackend(ttl=ttl) else: - logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}") return CacheToolsBackend(maxsize=maxsize, ttl=ttl) @@ -376,7 +801,7 @@ class TTLCache: - 支持Redis和cachetools的切换 """ - def __init__(self, maxsize: int = 128, ttl: int = 600): + def __init__(self, maxsize: int = 128, ttl: int = 1800): """ 初始化TTL缓存 diff --git a/app/helper/redis.py b/app/helper/redis.py index 618e3e46..8fa72be3 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -525,6 +525,30 @@ class AsyncRedisHelper(metaclass=Singleton): except Exception as e: logger.error(f"Failed to clear cache (async), region: {region}, error: {e}") + async def items(self, region: Optional[str] = None): + """ + 获取指定区域的所有缓存键值对 + + :param region: 缓存的区 + :return: 返回键值对生成器 + """ + try: + await self._connect() + if region: + cache_region = self.get_region(quote(region)) + redis_key = f"{cache_region}:key:*" + for key in self.client.scan_iter(redis_key): + value = await self.client.get(key) + if value is not None: + yield key, self.deserialize(value) + else: + for key in self.client.scan_iter("*"): + value = await self.client.get(key) + if value is not None: + yield key, self.deserialize(value) + except Exception as e: + logger.error(f"Failed to get items from Redis, region: {region}, error: {e}") + async def test(self) -> bool: """ 异步测试Redis连接性