diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index a1bcca76..a15f50ea 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -10,7 +10,6 @@ import aiofiles import pillow_avif # noqa 用于自动注册AVIF支持 from PIL import Image from anyio import Path as AsyncPath -from app.helper.sites import SitesHelper # noqa # noqa from fastapi import APIRouter, Body, Depends, HTTPException, Header, Request, Response from fastapi.responses import StreamingResponse @@ -29,7 +28,9 @@ from app.db.user_oper import get_current_active_superuser, get_current_active_su from app.helper.mediaserver import MediaServerHelper from app.helper.message import MessageHelper from app.helper.progress import ProgressHelper +from app.helper.redis import AsyncRedisHelper from app.helper.rule import RuleHelper +from app.helper.sites import SitesHelper # noqa # noqa from app.helper.subscribe import SubscribeHelper from app.helper.system import SystemHelper from app.log import logger @@ -48,7 +49,7 @@ router = APIRouter() async def fetch_image( url: str, proxy: bool = False, - use_disk_cache: bool = False, + use_cache: bool = False, if_none_match: Optional[str] = None, allowed_domains: Optional[set[str]] = None) -> Response: """ @@ -64,37 +65,55 @@ async def fetch_image( if not SecurityUtils.is_safe_url(url, allowed_domains): raise HTTPException(status_code=404, detail="Unsafe URL") - # 后续观察系统性能表现,如果发现磁盘缓存和HTTP缓存无法满足高并发情况下的响应速度需求,可以考虑重新引入内存缓存 - cache_path: Optional[AsyncPath] = None - if use_disk_cache: - # 生成缓存路径 - base_path = AsyncPath(settings.CACHE_PATH) - sanitized_path = SecurityUtils.sanitize_url_path(url) - cache_path = base_path / "images" / sanitized_path + # 缓存路径 + sanitized_path = SecurityUtils.sanitize_url_path(url) + base_path = AsyncPath(settings.CACHE_PATH) + cache_path = base_path / "images" / sanitized_path - # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 - if not cache_path.suffix: - cache_path = cache_path.with_suffix(".jpg") - - # 确保缓存路径和文件类型合法 - if not await SecurityUtils.async_is_safe_path(base_path=base_path, - user_path=cache_path, - allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES): - raise HTTPException(status_code=400, detail="Invalid cache path or file type") - - # 目前暂不考虑磁盘缓存文件是否过期,后续通过缓存清理机制处理 - if cache_path and await cache_path.exists(): - try: - async with aiofiles.open(cache_path, 'rb') as f: - content = await f.read() + if use_cache: + if settings.CACHE_BACKEND_TYPE == "redis": + # 使用Redis缓存 + redis_helper = AsyncRedisHelper() + content = await redis_helper.get(sanitized_path, region="image_cache") + if content: + # 检查 If-None-Match etag = HashUtils.md5(content) - headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7) if if_none_match == etag: + headers = RequestUtils.generate_cache_headers() return Response(status_code=304, headers=headers) - return Response(content=content, media_type="image/jpeg", headers=headers) - except Exception as e: - # 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理 - logger.debug(f"Failed to read cache file {cache_path}: {e}") + # 返回缓存图片 + headers = RequestUtils.generate_cache_headers(etag) + return Response( + content=content, + media_type=UrlUtils.get_mime_type(url, "image/jpeg"), + headers=headers + ) + else: + # 使用磁盘缓存 + if not cache_path.suffix: + # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 + cache_path = cache_path.with_suffix(".jpg") + + # 确保缓存路径和文件类型合法 + if not await SecurityUtils.async_is_safe_path(base_path=base_path, + user_path=cache_path, + allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES): + raise HTTPException(status_code=400, detail="Invalid cache path or file type") + + # 目前暂不考虑磁盘缓存文件是否过期,通过缓存清理机制处理 + if cache_path and await cache_path.exists(): + try: + # 读取磁盘缓存图片返回 + async with aiofiles.open(cache_path, 'rb') as f: + content = await f.read() + etag = HashUtils.md5(content) + headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7) + if if_none_match == etag: + return Response(status_code=304, headers=headers) + return Response(content=content, media_type="image/jpeg", headers=headers) + except Exception as e: + # 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理 + logger.debug(f"Failed to read cache file {cache_path}: {e}") # 请求远程图片 referer = "https://movie.douban.com/" if "doubanio.com" in url else None @@ -112,22 +131,28 @@ async def fetch_image( logger.debug(f"Invalid image format for URL {url}: {e}") raise HTTPException(status_code=502, detail="Invalid image format") + # 获取请求响应头 response_headers = response.headers - cache_control_header = response_headers.get("Cache-Control", "") cache_directive, max_age = RequestUtils.parse_cache_control(cache_control_header) - # 如果需要使用磁盘缓存,则保存到磁盘 - if use_disk_cache and cache_path: - try: - if not await cache_path.parent.exists(): - await cache_path.parent.mkdir(parents=True, exist_ok=True) - async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: - await tmp_file.write(content) - temp_path = AsyncPath(tmp_file.name) - await temp_path.replace(cache_path) - except Exception as e: - logger.debug(f"Failed to write cache file {cache_path}: {e}") + # 保存缓存 + if use_cache: + if settings.CACHE_BACKEND_TYPE == "redis": + # 保存到Redis缓存 + redis_helper = AsyncRedisHelper() + await redis_helper.set(sanitized_path, content, region="image_cache") + else: + # 保存到磁盘缓存 + try: + if not await cache_path.parent.exists(): + await cache_path.parent.mkdir(parents=True, exist_ok=True) + async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + await tmp_file.write(content) + temp_path = AsyncPath(tmp_file.name) + await temp_path.replace(cache_path) + except Exception as e: + logger.debug(f"Failed to write cache file {cache_path}: {e}") # 检查 If-None-Match etag = HashUtils.md5(content) @@ -135,8 +160,8 @@ async def fetch_image( headers = RequestUtils.generate_cache_headers(etag, cache_directive, max_age) return Response(status_code=304, headers=headers) + # 响应 headers = RequestUtils.generate_cache_headers(etag, cache_directive, max_age) - return Response( content=content, media_type=response_headers.get("Content-Type") or UrlUtils.get_mime_type(url, "image/jpeg"), @@ -159,7 +184,7 @@ async def proxy_img( hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if config and config.config and config.config.get("host")] allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts) - return await fetch_image(url=imgurl, proxy=proxy, use_disk_cache=cache, + return await fetch_image(url=imgurl, proxy=proxy, use_cache=cache, if_none_match=if_none_match, allowed_domains=allowed_domains) @@ -174,7 +199,7 @@ async def cache_img( """ # 如果没有启用全局图片缓存,则不使用磁盘缓存 proxy = "doubanio.com" not in url - return await fetch_image(url=url, proxy=proxy, use_disk_cache=settings.GLOBAL_IMAGE_CACHE, + return await fetch_image(url=url, proxy=proxy, use_cache=settings.GLOBAL_IMAGE_CACHE, if_none_match=if_none_match) diff --git a/app/chain/__init__.py b/app/chain/__init__.py index fb585500..06dfb1cf 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -13,6 +13,7 @@ from fastapi.concurrency import run_in_threadpool from qbittorrentapi import TorrentFilesList from transmission_rpc import File +from app.core.cache import get_cache_backend from app.core.config import settings from app.core.context import Context, MediaInfo, TorrentInfo from app.core.event import EventManager @@ -22,7 +23,6 @@ from app.core.plugin import PluginManager from app.db.message_oper import MessageOper from app.db.user_oper import UserOper from app.helper.message import MessageHelper, MessageQueueManager, MessageTemplateHelper -from app.helper.redis import RedisHelper from app.helper.service import ServiceConfigHelper from app.log import logger from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \ @@ -48,23 +48,17 @@ class ChainBase(metaclass=ABCMeta): send_callback=self.run_module ) self.pluginmanager = PluginManager() - # 初始化Redis缓存助手 - self._redis_helper = None - if settings.CACHE_BACKEND_TYPE == "redis": - try: - self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL) - except RuntimeError as e: - self._redis_helper = None - logger.warning(f"Redis缓存初始化失败,将使用本地缓存: {e}") + # 文件类缓存,保留1 + self._cache = get_cache_backend(ttl=30 * 24 * 3600) def load_cache(self, filename: str) -> Any: """ 加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) """ # 如果Redis可用,优先从Redis读取 - if self._redis_helper: + if self._cache.is_redis(): try: - cache_data = self._redis_helper.get(filename, region="chain_cache") + cache_data = self._cache.get(filename, region="chain_cache") if cache_data is not None: logger.debug(f"从Redis加载缓存: {filename}") return cache_data @@ -86,9 +80,9 @@ class ChainBase(metaclass=ABCMeta): 异步加载缓存,优先从Redis读取,没有数据时从本地读取(兼容存量未迁移数据) """ # 如果Redis可用,优先从Redis读取 - if self._redis_helper: + if self._cache.is_redis(): try: - cache_data = self._redis_helper.get(filename, region="chain_cache") + cache_data = self._cache.get(filename, region="chain_cache") if cache_data is not None: logger.debug(f"从Redis异步加载缓存: {filename}") return cache_data @@ -111,9 +105,9 @@ class ChainBase(metaclass=ABCMeta): 异步保存缓存,优先保存到Redis,同时保存到本地作为备份 """ # 如果Redis可用,优先保存到Redis - if self._redis_helper: + if self._cache.is_redis(): try: - self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache") + self._cache.set(filename, cache, region="chain_cache") logger.debug(f"异步保存缓存到Redis: {filename}") except Exception as e: logger.warning(f"异步保存缓存到Redis失败: {e}") @@ -130,9 +124,9 @@ class ChainBase(metaclass=ABCMeta): 保存缓存,优先保存到Redis,同时保存到本地作为备份 """ # 如果Redis可用,优先保存到Redis - if self._redis_helper: + if self._cache.is_redis(): try: - self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache") + self._cache.set(filename, cache, region="chain_cache") logger.debug(f"保存缓存到Redis: {filename}") except Exception as e: logger.warning(f"保存缓存到Redis失败: {e}") @@ -149,9 +143,9 @@ class ChainBase(metaclass=ABCMeta): 删除缓存,同时删除Redis和本地缓存 """ # 如果Redis可用,删除Redis缓存 - if self._redis_helper: + if self._cache.is_redis(): try: - self._redis_helper.delete(filename, region="chain_cache") + self._cache.delete(filename, region="chain_cache") logger.debug(f"删除Redis缓存: {filename}") except Exception as e: logger.warning(f"删除Redis缓存失败: {e}") @@ -170,9 +164,9 @@ class ChainBase(metaclass=ABCMeta): 异步删除缓存,同时删除Redis和本地缓存 """ # 如果Redis可用,删除Redis缓存 - if self._redis_helper: + if self._cache.is_redis(): try: - self._redis_helper.delete(filename, region="chain_cache") + self._cache.delete(filename, region="chain_cache") logger.debug(f"异步删除Redis缓存: {filename}") except Exception as e: logger.warning(f"异步删除Redis缓存失败: {e}") diff --git a/app/chain/recommend.py b/app/chain/recommend.py index 6646b935..cb59cf5e 100644 --- a/app/chain/recommend.py +++ b/app/chain/recommend.py @@ -1,48 +1,166 @@ -import asyncio import io +import tempfile +from pathlib import Path from typing import List, Optional -import aiofiles import pillow_avif # noqa 用于自动注册AVIF支持 from PIL import Image -from anyio import Path as AsyncPath from app.chain import ChainBase from app.chain.bangumi import BangumiChain from app.chain.douban import DoubanChain from app.chain.tmdb import TmdbChain -from app.core.cache import cache_backend, cached +from app.core.cache import cached from app.core.config import settings, global_vars from app.log import logger from app.schemas import MediaType -from app.utils.asyncio import AsyncUtils from app.utils.common import log_execution_time -from app.utils.http import AsyncRequestUtils +from app.utils.http import RequestUtils from app.utils.security import SecurityUtils from app.utils.singleton import Singleton -# 推荐相关的专用缓存 -recommend_ttl = 24 * 3600 -recommend_cache_region = "recommend" - class RecommendChain(ChainBase, metaclass=Singleton): """ 推荐处理链,单例运行 """ - # 推荐数据的缓存页数 + # 推荐缓存时间 + recommend_ttl = 24 * 3600 + # 推荐缓存页数 cache_max_pages = 5 + # 推荐缓存区域 + recommend_cache_region = "recommend" def refresh_recommend(self): """ - 刷新推荐数据 - 同步包装器 + 刷新推荐 """ + logger.debug("Starting to refresh Recommend data.") + self._cache.clear(region=self.recommend_cache_region) + logger.debug("Recommend Cache has been cleared.") + + # 推荐来源方法 + recommend_methods = [ + self.tmdb_movies, + self.tmdb_tvs, + self.tmdb_trending, + self.bangumi_calendar, + self.douban_movie_showing, + self.douban_movies, + self.douban_tvs, + self.douban_movie_top250, + self.douban_tv_weekly_chinese, + self.douban_tv_weekly_global, + self.douban_tv_animation, + self.douban_movie_hot, + self.douban_tv_hot, + ] + + # 缓存并刷新所有推荐数据 + recommends = [] + # 记录哪些方法已完成 + methods_finished = set() + # 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历 + for page in range(1, self.cache_max_pages + 1): + for method in recommend_methods: + if global_vars.is_system_stopped: + return + if method in methods_finished: + continue + logger.debug(f"Fetch {method.__name__} data for page {page}.") + data = method(page=page) + if not data: + logger.debug("All recommendation methods have finished fetching data. Ending pagination early.") + methods_finished.add(method) + continue + recommends.extend(data) + # 如果所有方法都已经完成,提前结束循环 + if len(methods_finished) == len(recommend_methods): + break + + # 缓存收集到的海报 + self.__cache_posters(recommends) + logger.debug("Recommend data refresh completed.") + + def __cache_posters(self, datas: List[dict]): + """ + 提取 poster_path 并缓存图片 + :param datas: 数据列表 + """ + if not settings.GLOBAL_IMAGE_CACHE: + return + + for data in datas: + if global_vars.is_system_stopped: + return + poster_path = data.get("poster_path") + if poster_path: + poster_url = poster_path.replace("original", "w500") + logger.debug(f"Caching poster image: {poster_url}") + self.__fetch_and_save_image(poster_url) + + def __fetch_and_save_image(self, url: str): + """ + 请求并保存图片 + :param url: 图片路径 + """ + # 生成缓存路径 + sanitized_path = SecurityUtils.sanitize_url_path(url) + cache_path = settings.CACHE_PATH / "images" / sanitized_path + + if self._cache.is_redis(): + if self._cache.get(sanitized_path, region=self.recommend_cache_region): + logger.debug(f"Cache hit: Image already exists for URL: {url}") + return + else: + # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 + if not cache_path.suffix: + cache_path = cache_path.with_suffix(".jpg") + + # 确保缓存路径和文件类型合法 + if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES): + logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}") + return + + # 本地存在缓存图片,则直接跳过 + if cache_path.exists(): + logger.debug(f"Cache hit: Image already exists at {cache_path}") + return + + # 请求远程图片 + referer = "https://movie.douban.com/" if "doubanio.com" in url else None + proxies = settings.PROXY if not referer else None + response = RequestUtils(ua=settings.NORMAL_USER_AGENT, proxies=proxies, referer=referer).get_res(url=url) + if not response: + logger.debug(f"Empty response for URL: {url}") + return + + # 验证下载的内容是否为有效图片 try: - AsyncUtils.run_async(self.async_refresh_recommend()) + Image.open(io.BytesIO(response.content)).verify() except Exception as e: - logger.error(f"刷新推荐数据失败:{str(e)}") - raise + logger.debug(f"Invalid image format for URL {url}: {e}") + return + if self._cache.is_redis(): + # 如果是Redis缓存,直接存储到缓存中 + try: + self._cache.set(sanitized_path, response.content, region=self.recommend_cache_region) + logger.debug(f"Successfully cached image for URL: {url} in Redis.") + except Exception as e: + logger.debug(f"Failed to cache image for URL {url} in Redis: {e}") + else: + # 如果是本地文件缓存,写入到指定路径 + try: + if not cache_path.parent.exists(): + cache_path.parent.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: + tmp_file.write(response.content) + temp_path = Path(tmp_file.name) + temp_path.replace(cache_path) + logger.debug(f"Successfully cached image at {cache_path} for URL: {url}") + except Exception as e: + logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}") @log_execution_time(logger=logger) @cached(ttl=recommend_ttl, region=recommend_cache_region) @@ -199,162 +317,6 @@ class RecommendChain(ChainBase, metaclass=Singleton): tvs = DoubanChain().tv_hot(page=page, count=count) return [media.to_dict() for media in tvs] if tvs else [] - # 异步版本的方法 - async def async_refresh_recommend(self): - """ - 异步刷新推荐 - """ - logger.debug("Starting to async refresh Recommend data.") - cache_backend.clear(region=recommend_cache_region) - logger.debug("Recommend Cache has been cleared.") - - # 推荐来源方法 - recommend_methods = [ - self.async_tmdb_movies, - self.async_tmdb_tvs, - self.async_tmdb_trending, - self.async_bangumi_calendar, - self.async_douban_movie_showing, - self.async_douban_movies, - self.async_douban_tvs, - self.async_douban_movie_top250, - self.async_douban_tv_weekly_chinese, - self.async_douban_tv_weekly_global, - self.async_douban_tv_animation, - self.async_douban_movie_hot, - self.async_douban_tv_hot, - ] - - # 缓存并刷新所有推荐数据 - recommends = [] - # 记录哪些方法已完成 - methods_finished = set() - # 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历 - for page in range(1, self.cache_max_pages + 1): - # 为每个页面并发执行所有方法 - tasks = [] - for method in recommend_methods: - if global_vars.is_system_stopped: - return - if method in methods_finished: - continue - tasks.append(self._async_fetch_method_data(method, page, methods_finished)) - - # 并发执行所有任务 - if tasks: - results = await asyncio.gather(*tasks, return_exceptions=True) - for result in results: - if isinstance(result, list) and result: - recommends.extend(result) - - # 如果所有方法都已经完成,提前结束循环 - if len(methods_finished) == len(recommend_methods): - break - - # 缓存收集到的海报 - await self.__async_cache_posters(recommends) - logger.debug("Async recommend data refresh completed.") - - @staticmethod - async def _async_fetch_method_data(method, page: int, methods_finished: set): - """ - 异步获取方法数据的辅助函数 - """ - try: - logger.debug(f"Async fetch {method.__name__} data for page {page}.") - data = await method(page=page) - if not data: - logger.debug(f"Method {method.__name__} finished fetching data. Ending pagination early.") - methods_finished.add(method) - return [] - return data - except Exception as e: - logger.error(f"Error fetching data from {method.__name__}: {e}") - methods_finished.add(method) - return [] - - async def __async_cache_posters(self, datas: List[dict]): - """ - 异步提取 poster_path 并缓存图片 - :param datas: 数据列表 - """ - if not settings.GLOBAL_IMAGE_CACHE: - return - - tasks = [] - for data in datas: - if global_vars.is_system_stopped: - return - poster_path = data.get("poster_path") - if poster_path: - poster_url = poster_path.replace("original", "w500") - logger.debug(f"Async caching poster image: {poster_url}") - tasks.append(self.__async_fetch_and_save_image(poster_url)) - - # 并发缓存图片 - if tasks: - await asyncio.gather(*tasks, return_exceptions=True) - - @staticmethod - async def __async_fetch_and_save_image(url: str): - """ - 异步请求并保存图片 - :param url: 图片路径 - """ - if not settings.GLOBAL_IMAGE_CACHE or not url: - return - - # 生成缓存路径 - base_path = AsyncPath(settings.CACHE_PATH) - sanitized_path = SecurityUtils.sanitize_url_path(url) - cache_path = base_path / "images" / sanitized_path - - # 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择 - if not cache_path.suffix: - cache_path = cache_path.with_suffix(".jpg") - - # 确保缓存路径和文件类型合法 - if not await SecurityUtils.async_is_safe_path(base_path=base_path, - user_path=cache_path, - allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES): - logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}") - return - - # 本地存在缓存图片,则直接跳过 - if await cache_path.exists(): - logger.debug(f"Cache hit: Image already exists at {cache_path}") - return - - # 请求远程图片 - referer = "https://movie.douban.com/" if "doubanio.com" in url else None - proxies = settings.PROXY if not referer else None - response = await AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT, - proxies=proxies, referer=referer).get_res(url=url) - if not response: - logger.debug(f"Empty response for URL: {url}") - return - - # 验证下载的内容是否为有效图片 - try: - Image.open(io.BytesIO(response.content)).verify() - except Exception as e: - logger.debug(f"Invalid image format for URL {url}: {e}") - return - - if not cache_path: - return - - try: - if not await cache_path.parent.exists(): - await cache_path.parent.mkdir(parents=True, exist_ok=True) - async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file: - await tmp_file.write(response.content) - temp_path = AsyncPath(tmp_file.name) - await temp_path.replace(cache_path) - logger.debug(f"Successfully cached image at {cache_path} for URL: {url}") - except Exception as e: - logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}") - @log_execution_time(logger=logger) @cached(ttl=recommend_ttl, region=recommend_cache_region) async def async_tmdb_movies(self, sort_by: Optional[str] = "popularity.desc", diff --git a/app/core/cache.py b/app/core/cache.py index 5cb208ba..e1bb6087 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -23,7 +23,8 @@ class CacheBackend(ABC): """ @abstractmethod - def set(self, key: str, value: Any, ttl: int, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: + def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: """ 设置缓存 @@ -76,6 +77,16 @@ class CacheBackend(ABC): """ pass + @abstractmethod + def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + pass + @abstractmethod def close(self) -> None: """ @@ -114,6 +125,10 @@ class CacheBackend(ABC): # 使用有序参数生成缓存键 return f"{func.__name__}_{hashkey(*keys)}" + @staticmethod + def is_redis() -> bool: + return settings.CACHE_BACKEND_TYPE == "redis" + class CacheToolsBackend(CacheBackend): """ @@ -128,7 +143,7 @@ class CacheToolsBackend(CacheBackend): - 不支持按 `key` 独立隔离 TTL 和 Maxsize,仅支持作用于 region 级别 """ - def __init__(self, maxsize: Optional[int] = 512, ttl: Optional[int] = 1800): + def __init__(self, maxsize: Optional[int] = 1024, ttl: Optional[int] = 1800): """ 初始化缓存实例 @@ -226,6 +241,18 @@ class CacheToolsBackend(CacheBackend): region_cache.clear() logger.info("Cleared all cache") + def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + region_cache = self.__get_region_cache(region) + if region_cache is None: + return {} + return dict(region_cache.items()) + def close(self) -> None: """ 内存缓存不需要关闭资源 @@ -247,20 +274,14 @@ class RedisBackend(CacheBackend): - Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存 """ - def __init__(self, redis_url: Optional[str] = "redis://localhost", ttl: Optional[int] = 1800): + def __init__(self, ttl: Optional[int] = 1800): """ 初始化 Redis 缓存实例 - :param redis_url: Redis 服务的 URL :param ttl: 缓存的存活时间,单位秒 """ self.ttl = ttl - self._redis_helper = None - try: - self.redis_helper = RedisHelper(redis_url=redis_url) - except RuntimeError as e: - logger.warning(f"Redis缓存初始化失败: {e}") - raise e + self.redis_helper = RedisHelper() def set(self, key: str, value: Any, ttl: Optional[int] = None, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None: @@ -313,6 +334,15 @@ class RedisBackend(CacheBackend): """ self.redis_helper.clear(region=region) + def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]: + """ + 获取指定区域的所有缓存项 + + :param region: 缓存的区 + :return: 返回一个字典,包含所有缓存键值对 + """ + return self.redis_helper.items(region=region) + def close(self) -> None: """ 关闭 Redis 客户端的连接池 @@ -324,28 +354,16 @@ def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) - """ 根据配置获取缓存后端实例 - :param maxsize: 缓存的最大条目数 + :param maxsize: 缓存的最大条目数,仅使用cachetools时生效 :param ttl: 缓存的默认存活时间,单位秒 :return: 返回缓存后端实例 """ - cache_type = settings.CACHE_BACKEND_TYPE - logger.debug(f"Cache backend type from settings: {cache_type}") - - if cache_type == "redis": - redis_url = settings.CACHE_BACKEND_URL - if redis_url: - try: - logger.debug(f"Attempting to use RedisBackend with URL: {redis_url}, TTL: {ttl}") - return RedisBackend(redis_url=redis_url, ttl=ttl) - except RuntimeError: - logger.warning("Falling back to CacheToolsBackend due to Redis connection failure.") - else: - logger.debug("Cache backend type is redis, but no valid REDIS_URL found. " - "Falling back to CacheToolsBackend.") - - # 如果不是 Redis,回退到内存缓存 - logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}") - return CacheToolsBackend(maxsize=maxsize, ttl=ttl) + if settings.CACHE_BACKEND_TYPE == "redis": + logger.debug(f"Attempting to use RedisBackend, TTL: {ttl}") + return RedisBackend(ttl=ttl) + else: + logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}") + return CacheToolsBackend(maxsize=maxsize, ttl=ttl) class TTLCache: @@ -369,12 +387,12 @@ class TTLCache: self.ttl = ttl self._backend = get_cache_backend(maxsize=maxsize, ttl=ttl) - def __getitem__(self, key): + def __getitem__(self, key: str): """ 获取缓存项 """ try: - value = self._backend.get(str(key)) + value = self._backend.get(key) if value is not None: return value except Exception as e: @@ -382,40 +400,40 @@ class TTLCache: raise KeyError(key) - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any): """ 设置缓存项 """ try: - self._backend.set(str(key), value, ttl=self.ttl) + self._backend.set(key, value, ttl=self.ttl) except Exception as e: logger.warning(f"缓存设置失败: {e}") - def __delitem__(self, key): + def __delitem__(self, key: str): """ 删除缓存项 """ try: - self._backend.delete(str(key)) + self._backend.delete(key) except Exception as e: logger.warning(f"缓存删除失败: {e}") - def __contains__(self, key): + def __contains__(self, key: str): """ 检查键是否存在 """ try: - return self._backend.exists(str(key)) + return self._backend.exists(key) except Exception as e: logger.warning(f"缓存检查失败: {e}") return False - def get(self, key, default=None): + def get(self, key: str, default: Any = None): """ 获取缓存项,如果不存在返回默认值 """ try: - value = self._backend.get(str(key)) + value = self._backend.get(key) if value is not None: return value except Exception as e: @@ -442,10 +460,6 @@ class TTLCache: logger.warning(f"缓存关闭失败: {e}") -# 缓存后端实例 -cache_backend = get_cache_backend() - - def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Optional[int] = 1800, skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False): """ @@ -458,6 +472,8 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Opti :param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False :return: 装饰器函数 """ + # 缓存后端实例 + cache_backend = get_cache_backend(maxsize=maxsize, ttl=ttl) def should_cache(value: Any) -> bool: """ @@ -554,15 +570,3 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Opti return wrapper return decorator - - -def close_cache() -> None: - """ - 关闭缓存后端连接并清理资源 - """ - try: - if cache_backend: - cache_backend.close() - logger.info("Cache backend closed successfully.") - except Exception as e: - logger.info(f"Error while closing cache backend: {e}") diff --git a/app/helper/redis.py b/app/helper/redis.py index 35ca7363..618e3e46 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -4,17 +4,19 @@ from typing import Any, Optional from urllib.parse import quote import redis +from redis.asyncio import Redis from app.core.config import settings from app.core.event import eventmanager, Event from app.log import logger from app.schemas import ConfigChangeEventData from app.schemas.types import EventType +from app.utils.singleton import Singleton -class RedisHelper: +class RedisHelper(metaclass=Singleton): """ - Redis连接和操作助手类 + Redis连接和操作助手类,单例模式 特性: - 管理Redis连接池和客户端 @@ -27,32 +29,30 @@ class RedisHelper: _complex_serializable_types = set() _simple_serializable_types = set() - def __init__(self, redis_url: Optional[str] = "redis://localhost"): + def __init__(self): """ 初始化Redis助手实例 - - :param redis_url: Redis服务的URL """ - self.redis_url = redis_url + self.redis_url = settings.CACHE_BACKEND_URL self.client = None - self._connect() def _connect(self): """ 建立Redis连接 """ try: - self.client = redis.Redis.from_url( - self.redis_url, - decode_responses=False, - socket_timeout=30, - socket_connect_timeout=5, - health_check_interval=60, - ) - # 测试连接,确保Redis可用 - self.client.ping() - logger.info(f"Successfully connected to Redis:{self.redis_url}") - self.set_memory_limit() + if self.client is None: + self.client = redis.Redis.from_url( + self.redis_url, + decode_responses=False, + socket_timeout=30, + socket_connect_timeout=5, + health_check_interval=60, + ) + # 测试连接,确保Redis可用 + self.client.ping() + logger.info(f"Successfully connected to Redis:{self.redis_url}") + self.set_memory_limit() except Exception as e: logger.error(f"Failed to connect to Redis: {e}") raise RuntimeError("Redis connection failed") from e @@ -69,6 +69,7 @@ class RedisHelper: if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']: return logger.info("配置变更,重连Redis...") + self.close() self._connect() def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): @@ -164,6 +165,7 @@ class RedisHelper: :param kwargs: 其他参数 """ try: + self._connect() redis_key = self.get_redis_key(region, key) # 对值进行序列化 serialized_value = self.serialize(value) @@ -181,6 +183,7 @@ class RedisHelper: :return: 存在返回True,否则返回False """ try: + self._connect() redis_key = self.get_redis_key(region, key) return self.client.exists(redis_key) == 1 except Exception as e: @@ -196,6 +199,7 @@ class RedisHelper: :return: 返回缓存的值,如果缓存不存在返回None """ try: + self._connect() redis_key = self.get_redis_key(region, key) value = self.client.get(redis_key) if value is not None: @@ -213,6 +217,7 @@ class RedisHelper: :param region: 缓存的区 """ try: + self._connect() redis_key = self.get_redis_key(region, key) self.client.delete(redis_key) except Exception as e: @@ -225,6 +230,7 @@ class RedisHelper: :param region: 缓存的区 """ try: + self._connect() if region: cache_region = self.get_region(quote(region)) redis_key = f"{cache_region}:key:*" @@ -239,10 +245,302 @@ class RedisHelper: except Exception as e: logger.error(f"Failed to clear cache, region: {region}, error: {e}") + def items(self, region: Optional[str] = None): + """ + 获取指定区域的所有缓存键值对 + + :param region: 缓存的区 + :return: 返回键值对生成器 + """ + try: + self._connect() + if region: + cache_region = self.get_region(quote(region)) + redis_key = f"{cache_region}:key:*" + for key in self.client.scan_iter(redis_key): + value = self.client.get(key) + if value is not None: + yield key, self.deserialize(value) + else: + for key in self.client.scan_iter("*"): + value = self.client.get(key) + if value is not None: + yield key, self.deserialize(value) + except Exception as e: + logger.error(f"Failed to get items from Redis, region: {region}, error: {e}") + + def test(self) -> bool: + """ + 测试Redis连接性 + """ + try: + self._connect() + return True + except Exception as e: + logger.error(f"Redis connection test failed: {e}") + return False + def close(self) -> None: """ 关闭Redis客户端的连接池 """ if self.client: self.client.close() + self.client = None logger.debug("Redis connection closed") + + +class AsyncRedisHelper(metaclass=Singleton): + """ + 异步Redis连接和操作助手类,单例模式 + + 特性: + - 管理异步Redis连接池和客户端 + - 提供序列化和反序列化功能 + - 支持内存限制和淘汰策略设置 + - 提供键名生成和区域管理功能 + - 所有操作都是异步的 + """ + + # 类型缓存集合,针对非容器简单类型 + _complex_serializable_types = set() + _simple_serializable_types = set() + + def __init__(self, redis_url: Optional[str] = "redis://localhost"): + """ + 初始化异步Redis助手实例 + + :param redis_url: Redis服务的URL + """ + self.redis_url = redis_url + self.client: Optional[Redis] = None + + async def _connect(self): + """ + 建立异步Redis连接 + """ + try: + if self.client is None: + self.client = Redis.from_url( + self.redis_url, + decode_responses=False, + socket_timeout=30, + socket_connect_timeout=5, + health_check_interval=60, + ) + # 测试连接,确保Redis可用 + await self.client.ping() + logger.info(f"Successfully connected to Redis (async):{self.redis_url}") + await self.set_memory_limit() + except Exception as e: + logger.error(f"Failed to connect to Redis (async): {e}") + raise RuntimeError("Redis async connection failed") from e + + @eventmanager.register(EventType.ConfigChanged) + async def handle_config_changed(self, event: Event): + """ + 处理配置变更事件,更新Redis设置 + :param event: 事件对象 + """ + if not event: + return + event_data: ConfigChangeEventData = event.event_data + if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']: + return + logger.info("配置变更,重连Redis (async)...") + await self.close() + await self._connect() + + async def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): + """ + 动态设置Redis最大内存和内存淘汰策略 + + :param policy: 淘汰策略(如'allkeys-lru') + """ + try: + # 如果有显式值,则直接使用,为0时说明不限制,如果未配置,开启BIG_MEMORY_MODE时为"1024mb",未开启时为"256mb" + maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb") + await self.client.config_set("maxmemory", maxmemory) + await self.client.config_set("maxmemory-policy", policy) + logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy} (async)") + except Exception as e: + logger.error(f"Failed to set Redis maxmemory or policy (async): {e}") + + @staticmethod + def is_container_type(t): + """ + 判断是否为容器类型 + """ + return t in (list, dict, tuple, set) + + @classmethod + def serialize(cls, value: Any) -> bytes: + """ + 将值序列化为二进制数据,根据序列化方式标识格式 + """ + vt = type(value) + # 针对非容器类型使用缓存策略 + if not cls.is_container_type(vt): + # 如果已知需要复杂序列化 + if vt in cls._complex_serializable_types: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + # 如果已知可以简单序列化 + if vt in cls._simple_serializable_types: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 + try: + json_data = json.dumps(value).encode("utf-8") + cls._simple_serializable_types.add(vt) + return b"JSON" + b"\x00" + json_data + except TypeError: + cls._complex_serializable_types.add(vt) + return b"PICKLE" + b"\x00" + pickle.dumps(value) + # 针对容器类型,每次尝试简单序列化,不使用缓存 + else: + try: + json_data = json.dumps(value).encode("utf-8") + return b"JSON" + b"\x00" + json_data + except TypeError: + return b"PICKLE" + b"\x00" + pickle.dumps(value) + + @classmethod + def deserialize(cls, value: bytes) -> Any: + """ + 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 + """ + format_marker, data = value.split(b"\x00", 1) + if format_marker == b"JSON": + return json.loads(data.decode("utf-8")) + elif format_marker == b"PICKLE": + return pickle.loads(data) + else: + raise ValueError("Unknown serialization format") + + @staticmethod + def get_region(region: Optional[str] = "DEFAULT"): + """ + 获取缓存的区 + """ + return f"region:{region}" if region else "region:default" + + def get_redis_key(self, region: str, key: str) -> str: + """ + 获取缓存Key + """ + # 使用region作为缓存键的一部分 + region = self.get_region(quote(region)) + return f"{region}:key:{quote(key)}" + + async def set(self, key: str, value: Any, ttl: Optional[int] = None, + region: Optional[str] = "DEFAULT", **kwargs) -> None: + """ + 异步设置缓存 + + :param key: 缓存的键 + :param value: 缓存的值 + :param ttl: 缓存的存活时间,单位秒 + :param region: 缓存的区 + :param kwargs: 其他参数 + """ + try: + await self._connect() + redis_key = self.get_redis_key(region, key) + # 对值进行序列化 + serialized_value = self.serialize(value) + kwargs.pop("maxsize", None) + await self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) + except Exception as e: + logger.error(f"Failed to set key (async): {key} in region: {region}, error: {e}") + + async def exists(self, key: str, region: Optional[str] = "DEFAULT") -> bool: + """ + 异步判断缓存键是否存在 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 存在返回True,否则返回False + """ + try: + await self._connect() + redis_key = self.get_redis_key(region, key) + result = await self.client.exists(redis_key) + return result == 1 + except Exception as e: + logger.error(f"Failed to exists key (async): {key} region: {region}, error: {e}") + return False + + async def get(self, key: str, region: Optional[str] = "DEFAULT") -> Optional[Any]: + """ + 异步获取缓存的值 + + :param key: 缓存的键 + :param region: 缓存的区 + :return: 返回缓存的值,如果缓存不存在返回None + """ + try: + await self._connect() + redis_key = self.get_redis_key(region, key) + value = await self.client.get(redis_key) + if value is not None: + return self.deserialize(value) + return None + except Exception as e: + logger.error(f"Failed to get key (async): {key} in region: {region}, error: {e}") + return None + + async def delete(self, key: str, region: Optional[str] = "DEFAULT") -> None: + """ + 异步删除缓存 + + :param key: 缓存的键 + :param region: 缓存的区 + """ + try: + await self._connect() + redis_key = self.get_redis_key(region, key) + await self.client.delete(redis_key) + except Exception as e: + logger.error(f"Failed to delete key (async): {key} in region: {region}, error: {e}") + + async def clear(self, region: Optional[str] = None) -> None: + """ + 异步清除指定区域的缓存或全部缓存 + + :param region: 缓存的区 + """ + try: + await self._connect() + if region: + cache_region = self.get_region(quote(region)) + redis_key = f"{cache_region}:key:*" + async with self.client.pipeline() as pipe: + async for key in self.client.scan_iter(redis_key): + await pipe.delete(key) + await pipe.execute() + logger.info(f"Cleared Redis cache for region (async): {region}") + else: + await self.client.flushdb() + logger.info("Cleared all Redis cache (async)") + except Exception as e: + logger.error(f"Failed to clear cache (async), region: {region}, error: {e}") + + async def test(self) -> bool: + """ + 异步测试Redis连接性 + """ + try: + await self._connect() + return True + except Exception as e: + logger.error(f"Redis async connection test failed: {e}") + return False + + async def close(self) -> None: + """ + 关闭异步Redis客户端的连接池 + """ + if self.client: + await self.client.close() + self.client = None + logger.debug("Redis async connection closed") diff --git a/app/helper/subscribe.py b/app/helper/subscribe.py index 8d5cd238..a499ca33 100644 --- a/app/helper/subscribe.py +++ b/app/helper/subscribe.py @@ -1,7 +1,7 @@ from threading import Thread from typing import List, Tuple, Optional -from app.core.cache import cached, cache_backend +from app.core.cache import cached from app.core.config import settings from app.db.subscribe_oper import SubscribeOper from app.db.systemconfig_oper import SystemConfigOper @@ -111,7 +111,12 @@ class SubscribeHelper(metaclass=WeakSingleton): if res and res.status_code == 200: # 清除缓存 if clear_cache: - cache_backend.clear(region=self._shares_cache_region) + self.get_shares.cache_clear() + self.get_statistic.cache_clear() + self.get_share_statistics.cache_clear() + self.async_get_shares.cache_clear() + self.async_get_statistic.cache_clear() + self.async_get_share_statistics.cache_clear() return True, "" else: return False, res.json().get("message") diff --git a/app/helper/workflow.py b/app/helper/workflow.py index 55220708..5a5f582d 100644 --- a/app/helper/workflow.py +++ b/app/helper/workflow.py @@ -1,7 +1,7 @@ import json from typing import List, Tuple, Optional -from app.core.cache import cached, cache_backend +from app.core.cache import cached from app.core.config import settings from app.db.models import Workflow from app.db.workflow_oper import WorkflowOper @@ -89,7 +89,8 @@ class WorkflowHelper(metaclass=WeakSingleton): if success: # 清除缓存 if clear_cache: - cache_backend.clear(region=self._shares_cache_region) + self.get_shares.cache_clear() + self.async_get_shares.cache_clear() return True, "" else: try: diff --git a/app/modules/douban/douban_cache.py b/app/modules/douban/douban_cache.py index 370b3dd4..6fa3b777 100644 --- a/app/modules/douban/douban_cache.py +++ b/app/modules/douban/douban_cache.py @@ -1,24 +1,19 @@ import pickle -import random -import time import traceback from pathlib import Path from threading import RLock from typing import Optional +from app.core.cache import get_cache_backend from app.core.config import settings from app.core.meta import MetaBase from app.core.metainfo import MetaInfo -from app.helper.redis import RedisHelper from app.log import logger from app.schemas.types import MediaType from app.utils.singleton import WeakSingleton lock = RLock() -CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp" -EXPIRE_TIMESTAMP = settings.CONF.meta - class DoubanCache(metaclass=WeakSingleton): """ @@ -34,32 +29,23 @@ class DoubanCache(metaclass=WeakSingleton): _douban_cache_expire: bool = True def __init__(self): - # 初始化Redis缓存助手 - self._redis_helper = None - if settings.CACHE_BACKEND_TYPE == "redis": - try: - self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL) - except RuntimeError as e: - logger.warning(f"豆瓣缓存Redis初始化失败,将使用本地缓存: {e}") - self._redis_helper = None - # 加载本地缓存数据 - self._meta_path = settings.TEMP_PATH / "__douban_cache__" - if not self._redis_helper: - self._meta_data = self.__load(self._meta_path) + self.maxsize = settings.CONF.douban + self.ttl = settings.CONF.meta + self.region = "__douban_cache__" + self._meta_filepath = settings.TEMP_PATH / self.region + # 初始化缓存 + self._cache = get_cache_backend(maxsize=self.maxsize, ttl=self.ttl) + # 非Redis加载本地缓存数据 + if not self._cache.is_redis(): + for key, value in self.__load(self._meta_filepath).items(): + self._cache.set(key, value) def clear(self): """ 清空所有豆瓣缓存 """ with lock: - self._meta_data = {} - # 如果Redis可用,同时清理Redis缓存 - if self._redis_helper: - try: - self._redis_helper.clear(region="douban_cache") - logger.debug("已清理豆瓣Redis缓存") - except Exception as e: - logger.warning(f"清理豆瓣Redis缓存失败: {e}") + self._cache.clear(region=self.region) @staticmethod def __get_key(meta: MetaBase) -> str: @@ -74,28 +60,8 @@ class DoubanCache(metaclass=WeakSingleton): 根据KEY值获取缓存值 """ key = self.__get_key(meta) - - if self._redis_helper: - # 如果Redis可用,从Redis读取 - try: - redis_data = self._redis_helper.get(key, region="douban_cache") - return redis_data or {} - except Exception as e: - logger.warning(f"从Redis获取豆瓣缓存失败: {e}") - else: - # Redis不可用时,从内存缓存读取 - with lock: - info: dict = self._meta_data.get(key) - if info: - # 检查过期时间 - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire or int(time.time()) < expire: - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - self._meta_data[key] = info - elif expire and self._douban_cache_expire: - self.delete(key) - return info or {} - return {} + with lock: + return self._cache.get(key, region=self.region) or {} def delete(self, key: str) -> dict: """ @@ -103,18 +69,12 @@ class DoubanCache(metaclass=WeakSingleton): @param key: 缓存key @return: 被删除的缓存内容 """ - if self._redis_helper: - # 如果Redis可用,删除Redis缓存 - try: - self._redis_helper.delete(key, region="douban_cache") - return {} - except Exception as e: - logger.warning(f"删除豆瓣Redis缓存失败: {e}") - return {} - else: - # Redis不可用时,删除内存缓存 - with lock: - return self._meta_data.pop(key, {}) + with lock: + redis_data = self._cache.get(key, region=self.region) + if redis_data: + self._cache.delete(key, region=self.region) + return redis_data + return {} def modify(self, key: str, title: str) -> dict: """ @@ -123,24 +83,13 @@ class DoubanCache(metaclass=WeakSingleton): @param title: 标题 @return: 被修改后缓存内容 """ - if self._redis_helper: - # 如果Redis可用,修改Redis缓存 - try: - redis_data = self._redis_helper.get(key, region="douban_cache") - if redis_data: - redis_data['title'] = title - self._redis_helper.set(key, redis_data, ttl=EXPIRE_TIMESTAMP, region="douban_cache") - return redis_data - except Exception as e: - logger.warning(f"修改豆瓣Redis缓存失败: {e}") + with lock: + redis_data = self._cache.get(key, region=self.region) + if redis_data: + redis_data["title"] = title + self._cache.set(key, redis_data, region=self.region) + return redis_data return {} - else: - # Redis不可用时,修改内存缓存 - with lock: - if self._meta_data.get(key): - self._meta_data[key]['title'] = title - self._meta_data[key][CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - return self._meta_data.get(key) @staticmethod def __load(path: Path) -> dict: @@ -152,10 +101,9 @@ class DoubanCache(metaclass=WeakSingleton): with open(path, 'rb') as f: data = pickle.load(f) return data - return {} except Exception as e: logger.error(f"加载缓存失败: {str(e)} - {traceback.format_exc()}") - return {} + return {} def update(self, meta: MetaBase, info: dict) -> None: """ @@ -184,99 +132,41 @@ class DoubanCache(metaclass=WeakSingleton): if not poster_path and info.get("cover"): poster_path = info.get("cover").get("url") - if self._redis_helper: - # 如果Redis可用,保存到Redis - cache_data = { + with lock: + self._cache.set(self.__get_key(meta), { "id": info.get("id"), "type": mtype, "year": cache_year, "title": cache_title, "poster_path": poster_path - } - try: - self._redis_helper.set(self.__get_key(meta), cache_data, ttl=EXPIRE_TIMESTAMP, - region="douban_cache") - except Exception as e: - logger.warning(f"保存豆瓣缓存到Redis失败: {e}") - else: - # Redis不可用时,保存到内存缓存 - with lock: - cache_data = { - "id": info.get("id"), - "type": mtype, - "year": cache_year, - "title": cache_title, - "poster_path": poster_path, - CACHE_EXPIRE_TIMESTAMP_STR: int(time.time()) + EXPIRE_TIMESTAMP - } - self._meta_data[self.__get_key(meta)] = cache_data + }, region=self.region) elif info is not None: # None时不缓存,此时代表网络错误,允许重复请求 - if self._redis_helper: - try: - self._redis_helper.set(self.__get_key(meta), {'id': "0"}, ttl=EXPIRE_TIMESTAMP, - region="douban_cache") - except Exception as e: - logger.warning(f"保存豆瓣缓存到Redis失败: {e}") - else: - with lock: - self._meta_data[self.__get_key(meta)] = {'id': "0"} + with lock: + self._cache.set(self.__get_key(meta), { + "id": 0 + }, region=self.region) def save(self, force: Optional[bool] = False) -> None: """ 保存缓存数据到文件 """ - # 如果Redis可用,不需要保存到本地文件 - if self._redis_helper: + # Redis不需要保存到本地文件 + if self._cache.is_redis(): return - # Redis不可用时,保存到本地文件 - meta_data = self.__load(self._meta_path) - new_meta_data = {k: v for k, v in self._meta_data.items() if v.get("id")} + # 本地文件 + meta_data = self.__load(self._meta_filepath) + # 当前缓存数据(去除无法识别) + new_meta_data = {k: v for k, v in self._cache.items(region=self.region) if v.get("id")} if not force \ - and not self._random_sample(new_meta_data) \ and meta_data.keys() == new_meta_data.keys(): return - - with open(self._meta_path, 'wb') as f: + # 写入本地 + with open(self._meta_filepath, 'wb') as f: pickle.dump(new_meta_data, f, pickle.HIGHEST_PROTOCOL) # noqa - def _random_sample(self, new_meta_data: dict) -> bool: - """ - 采样分析是否需要保存 - """ - ret = False - if len(new_meta_data) < 25: - keys = list(new_meta_data.keys()) - for k in keys: - info = new_meta_data.get(k) - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire: - ret = True - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - elif int(time.time()) >= expire: - ret = True - if self._douban_cache_expire: - new_meta_data.pop(k) - else: - count = 0 - keys = random.sample(sorted(new_meta_data.keys()), 25) - for k in keys: - info = new_meta_data.get(k) - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire: - ret = True - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - elif int(time.time()) >= expire: - ret = True - if self._douban_cache_expire: - new_meta_data.pop(k) - count += 1 - if count >= 5: - ret |= self._random_sample(new_meta_data) - return ret - def __del__(self): self.save() diff --git a/app/modules/redis/__init__.py b/app/modules/redis/__init__.py index 54fb399a..4ed0a0ed 100644 --- a/app/modules/redis/__init__.py +++ b/app/modules/redis/__init__.py @@ -51,9 +51,10 @@ class RedisModule(_ModuleBase): """ if settings.CACHE_BACKEND_TYPE != "redis": return None + redis_helper = RedisHelper() try: - redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL) + if redis_helper.test(): + return True, "" + return False, "Redis连接失败,请检查配置" + finally: redis_helper.close() - except RuntimeError as e: - return False, f"Redis连接失败:{e}" - return True, "" diff --git a/app/modules/themoviedb/tmdb_cache.py b/app/modules/themoviedb/tmdb_cache.py index 13482617..b0ceaca8 100644 --- a/app/modules/themoviedb/tmdb_cache.py +++ b/app/modules/themoviedb/tmdb_cache.py @@ -1,22 +1,17 @@ import pickle -import random -import time import traceback from pathlib import Path from threading import RLock +from app.core.cache import get_cache_backend from app.core.config import settings from app.core.meta import MetaBase -from app.helper.redis import RedisHelper from app.log import logger from app.schemas.types import MediaType from app.utils.singleton import WeakSingleton lock = RLock() -CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp" -EXPIRE_TIMESTAMP = settings.CONF.meta - class TmdbCache(metaclass=WeakSingleton): """ @@ -32,33 +27,23 @@ class TmdbCache(metaclass=WeakSingleton): _tmdb_cache_expire: bool = True def __init__(self): - # 初始化Redis缓存助手 - self._redis_helper = None - if settings.CACHE_BACKEND_TYPE == "redis": - try: - self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL) - except RuntimeError as e: - logger.warning(f"TMDB缓存Redis初始化失败,将使用本地缓存: {e}") - self._redis_helper = None - - # 加载缓存数据 - self._meta_path = settings.TEMP_PATH / "__tmdb_cache__" - if not self._redis_helper: - self._meta_data = self.__load(self._meta_path) + self.maxsize = settings.CONF.douban + self.ttl = settings.CONF.meta + self.region = "__tmdb_cache__" + self._meta_filepath = settings.TEMP_PATH / self.region + # 初始化缓存 + self._cache = get_cache_backend(maxsize=self.maxsize, ttl=self.ttl) + # 非Redis加载本地缓存数据 + if not self._cache.is_redis(): + for key, value in self.__load(self._meta_filepath).items(): + self._cache.set(key, value) def clear(self): """ 清空所有TMDB缓存 """ with lock: - self._meta_data = {} - # 如果Redis可用,同时清理Redis缓存 - if self._redis_helper: - try: - self._redis_helper.clear(region="tmdb_cache") - logger.debug("已清理TMDB Redis缓存") - except Exception as e: - logger.warning(f"清理TMDB Redis缓存失败: {e}") + self._cache.clear(region=self.region) @staticmethod def __get_key(meta: MetaBase) -> str: @@ -73,27 +58,8 @@ class TmdbCache(metaclass=WeakSingleton): """ key = self.__get_key(meta) - if self._redis_helper: - # 如果Redis可用,从Redis读取 - try: - redis_data = self._redis_helper.get(key, region="tmdb_cache") - return redis_data or {} - except Exception as e: - logger.warning(f"从Redis获取TMDB缓存失败: {e}") - else: - # Redis不可用时,从内存缓存读取 - with lock: - info: dict = self._meta_data.get(key) - if info: - # 检查过期时间 - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire or int(time.time()) < expire: - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - self._meta_data[key] = info - elif expire and self._tmdb_cache_expire: - self.delete(key) - return info or {} - return {} + with lock: + return self._cache.get(key, region=self.region) or {} def delete(self, key: str) -> dict: """ @@ -101,18 +67,12 @@ class TmdbCache(metaclass=WeakSingleton): @param key: 缓存key @return: 被删除的缓存内容 """ - if self._redis_helper: - # 如果Redis可用,删除Redis缓存 - try: - self._redis_helper.delete(key, region="tmdb_cache") - return {} - except Exception as e: - logger.warning(f"删除TMDB Redis缓存失败: {e}") - return {} - else: - # Redis不可用时,删除内存缓存 - with lock: - return self._meta_data.pop(key, {}) + with lock: + redis_data = self._cache.get(key, region=self.region) + if redis_data: + self._cache.delete(key, region=self.region) + return redis_data + return {} def modify(self, key: str, title: str) -> dict: """ @@ -121,24 +81,13 @@ class TmdbCache(metaclass=WeakSingleton): @param title: 标题 @return: 被修改后缓存内容 """ - if self._redis_helper: - # 如果Redis可用,修改Redis缓存 - try: - redis_data = self._redis_helper.get(key, region="tmdb_cache") - if redis_data: - redis_data['title'] = title - self._redis_helper.set(key, redis_data, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache") - return redis_data - except Exception as e: - logger.warning(f"修改TMDB Redis缓存失败: {e}") + with lock: + redis_data = self._cache.get(key, region=self.region) + if redis_data: + redis_data['title'] = title + self._cache.set(key, redis_data, region=self.region) + return redis_data return {} - else: - # Redis不可用时,修改内存缓存 - with lock: - if self._meta_data.get(key): - self._meta_data[key]['title'] = title - self._meta_data[key][CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - return self._meta_data.get(key) @staticmethod def __load(path: Path) -> dict: @@ -158,6 +107,7 @@ class TmdbCache(metaclass=WeakSingleton): """ 新增或更新缓存条目 """ + key = self.__get_key(meta) if info: # 缓存标题 cache_title = info.get("title") \ @@ -168,8 +118,8 @@ class TmdbCache(metaclass=WeakSingleton): if cache_year: cache_year = cache_year[:4] - if self._redis_helper: - # 如果Redis可用,保存到Redis + with lock: + # 缓存数据 cache_data = { "id": info.get("id"), "type": info.get("media_type"), @@ -178,89 +128,32 @@ class TmdbCache(metaclass=WeakSingleton): "poster_path": info.get("poster_path"), "backdrop_path": info.get("backdrop_path") } - try: - self._redis_helper.set(self.__get_key(meta), cache_data, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache") - except Exception as e: - logger.warning(f"保存TMDB缓存到Redis失败: {e}") - else: - # Redis不可用时,保存到内存缓存 - with lock: - cache_data = { - "id": info.get("id"), - "type": info.get("media_type"), - "year": cache_year, - "title": cache_title, - "poster_path": info.get("poster_path"), - "backdrop_path": info.get("backdrop_path"), - CACHE_EXPIRE_TIMESTAMP_STR: int(time.time()) + EXPIRE_TIMESTAMP - } - self._meta_data[self.__get_key(meta)] = cache_data + self._cache.set(key, cache_data, region=self.region) elif info is not None: # None时不缓存,此时代表网络错误,允许重复请求 - if self._redis_helper: - try: - self._redis_helper.set(self.__get_key(meta), {'id': 0}, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache") - except Exception as e: - logger.warning(f"保存TMDB缓存到Redis失败: {e}") - else: - with lock: - self._meta_data[self.__get_key(meta)] = {'id': 0} + with lock: + self._cache.set(key, {"id": 0}, region=self.region) def save(self, force: bool = False) -> None: """ 保存缓存数据到文件 """ - # 如果Redis可用,不需要保存到本地文件 - if self._redis_helper: + # Redis不需要保存到本地文件 + if self._cache.is_redis(): return # Redis不可用时,保存到本地文件 - meta_data = self.__load(self._meta_path) - new_meta_data = {k: v for k, v in self._meta_data.items() if v.get("id")} + meta_data = self.__load(self._meta_filepath) + # 当前缓存,去除无法识别 + new_meta_data = {k: v for k, v in self._cache.items(region=self.region) if v.get("id")} if not force \ - and not self._random_sample(new_meta_data) \ and meta_data.keys() == new_meta_data.keys(): return - with open(self._meta_path, 'wb') as f: + with open(self._meta_filepath, 'wb') as f: pickle.dump(new_meta_data, f, pickle.HIGHEST_PROTOCOL) # type: ignore - def _random_sample(self, new_meta_data: dict) -> bool: - """ - 采样分析是否需要保存 - """ - ret = False - if len(new_meta_data) < 25: - keys = list(new_meta_data.keys()) - for k in keys: - info = new_meta_data.get(k) - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire: - ret = True - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - elif int(time.time()) >= expire: - ret = True - if self._tmdb_cache_expire: - new_meta_data.pop(k) - else: - count = 0 - keys = random.sample(sorted(new_meta_data.keys()), 25) - for k in keys: - info = new_meta_data.get(k) - expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR) - if not expire: - ret = True - info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP - elif int(time.time()) >= expire: - ret = True - if self._tmdb_cache_expire: - new_meta_data.pop(k) - count += 1 - if count >= 5: - ret |= self._random_sample(new_meta_data) - return ret - def __del__(self): self.save() diff --git a/app/scheduler.py b/app/scheduler.py index ac1efdfe..248d5e91 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -18,7 +18,7 @@ from app.chain.subscribe import SubscribeChain from app.chain.transfer import TransferChain from app.chain.workflow import WorkflowChain from app.core.config import settings -from app.core.event import EventManager, eventmanager, Event +from app.core.event import eventmanager, Event from app.core.plugin import PluginManager from app.db.systemconfig_oper import SystemConfigOper from app.helper.message import MessageHelper diff --git a/app/startup/modules_initializer.py b/app/startup/modules_initializer.py index 0c201f2d..a8612e2c 100644 --- a/app/startup/modules_initializer.py +++ b/app/startup/modules_initializer.py @@ -1,5 +1,7 @@ import sys +from app.helper.redis import RedisHelper, AsyncRedisHelper + # SitesHelper涉及资源包拉取,提前引入并容错提示 try: from app.helper.sites import SitesHelper # noqa @@ -12,7 +14,6 @@ except ImportError as e: from app.utils.system import SystemUtils from app.log import logger from app.core.config import settings -from app.core.cache import close_cache from app.core.module import ModuleManager from app.core.event import EventManager from app.helper.thread import ThreadHelper @@ -119,8 +120,9 @@ async def stop_modules(): ThreadHelper().shutdown() # 停止消息服务 stop_message() - # 停止缓存连接 - close_cache() + # 关闭Redis缓存连接 + RedisHelper().close() + await AsyncRedisHelper().close() # 停止数据库连接 await close_database() # 停止前端服务