diff --git a/app/core/cache.py b/app/core/cache.py index 2b187163..91f60427 100644 --- a/app/core/cache.py +++ b/app/core/cache.py @@ -1,13 +1,18 @@ import inspect -import os +import json +import pickle from abc import ABC, abstractmethod from functools import wraps from typing import Any, Dict, Optional +from urllib.parse import quote import redis from cachetools import TTLCache from cachetools.keys import hashkey +from app.core.config import settings +from app.log import logger + # 默认缓存区 DEFAULT_CACHE_REGION = "DEFAULT" @@ -60,6 +65,13 @@ class CacheBackend(ABC): """ pass + @abstractmethod + def close(self) -> None: + """ + 关闭缓存连接 + """ + pass + @staticmethod def get_region(region: str = DEFAULT_CACHE_REGION): """ @@ -154,15 +166,32 @@ class CacheToolsBackend(CacheBackend): region_cache = self.__get_region_cache(region) if region_cache: region_cache.clear() + logger.info(f"Cleared cache for region: {region}") else: # 清除所有区域的缓存 for region_cache in self._region_caches.values(): region_cache.clear() + logger.info("Cleared all cache") + + def close(self) -> None: + """ + 内存缓存不需要关闭资源 + """ + pass class RedisBackend(CacheBackend): """ 基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存 + + 特性: + - 支持动态设置缓存的 TTL(Time To Live,存活时间) + - 支持分区域(region)管理缓存,不同的 region 采用独立的命名空间 + - 支持自定义最大内存限制(maxmemory)和内存淘汰策略(如 allkeys-lru) + + 限制: + - 由于 Redis 的分布式特性,写入和读取可能受到网络延迟的影响 + - Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存 """ def __init__(self, redis_url: str = "redis://localhost", ttl: int = 1800): @@ -174,15 +203,76 @@ class RedisBackend(CacheBackend): """ self.redis_url = redis_url self.ttl = ttl - self.client = redis.StrictRedis.from_url(redis_url) + try: + self.client = redis.Redis.from_url( + redis_url, + decode_responses=False, + socket_timeout=30, + socket_connect_timeout=5, + health_check_interval=60, + ) + # 测试连接,确保 Redis 可用 + self.client.ping() + logger.debug(f"Successfully connected to Redis") + self.set_memory_limit() + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise RuntimeError("Redis connection failed") from e + + def set_memory_limit(self, policy: str = "allkeys-lru"): + """ + 动态设置 Redis 最大内存和内存淘汰策略 + :param policy: 淘汰策略(如 'allkeys-lru') + """ + try: + # 如果有显式值,则直接使用,为 0 时说明不限制,如果未配置,开启 BIG_MEMORY_MODE 时为 "1024mb",未开启时为 "256mb" + maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb") + self.client.config_set("maxmemory", maxmemory) + self.client.config_set("maxmemory-policy", policy) + logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}") + except Exception as e: + logger.error(f"Failed to set Redis maxmemory or policy: {e}") @staticmethod - def get_redis_key(region, key): + def serialize(value: Any) -> bytes: + """ + 将值序列化为二进制数据,根据序列化方式标识格式 + """ + try: + # 尝试 JSON 序列化 + return b"JSON" + b"\x00" + json.dumps(value).encode("utf-8") + except TypeError: + # 如果 JSON 序列化失败,使用 Pickle 序列化 + return b"PICKLE" + b"\x00" + pickle.dumps(value) + + @staticmethod + def deserialize(value: bytes) -> Any: + """ + 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 + """ + format_marker, data = value.split(b"\x00", 1) + if format_marker == b"JSON": + return json.loads(data.decode("utf-8")) + elif format_marker == b"PICKLE": + return pickle.loads(data) + else: + raise ValueError("Unknown serialization format") + + # @staticmethod + # def serialize(value: Any) -> bytes: + # return msgpack.packb(value, use_bin_type=True) + # + # @staticmethod + # def deserialize(value: bytes) -> Any: + # return msgpack.unpackb(value, raw=False) + + def get_redis_key(self, region: str, key: str) -> str: """ 获取缓存 Key """ # 使用 region 作为缓存键的一部分 - return f"region:{region}:key:{key}" + region = self.get_region(quote(region)) + return f"{region}:key:{quote(key)}" def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None: """ @@ -194,11 +284,17 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 :param kwargs: kwargs """ - ttl = ttl or self.ttl - redis_key = self.get_redis_key(region, key) - self.client.setex(redis_key, ttl, value) + try: + ttl = ttl or self.ttl + redis_key = self.get_redis_key(region, key) + # 对值进行序列化 + serialized_value = self.serialize(value) + kwargs.pop("maxsize", None) + self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) + except Exception as e: + logger.error(f"Failed to set key: {key} in region: {region}, error: {e}") - def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any: + def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Optional[Any]: """ 获取缓存的值 @@ -206,9 +302,15 @@ class RedisBackend(CacheBackend): :param region: 缓存的区 :return: 返回缓存的值,如果缓存不存在返回 None """ - redis_key = self.get_redis_key(region, key) - value = self.client.get(redis_key) - return value + try: + redis_key = self.get_redis_key(region, key) + value = self.client.get(redis_key) + if value is not None: + return self.deserialize(value) # noqa + return None + except Exception as e: + logger.error(f"Failed to get key: {key} in region: {region}, error: {e}") + return None def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None: """ @@ -217,24 +319,40 @@ class RedisBackend(CacheBackend): :param key: 缓存的键 :param region: 缓存的区 """ - redis_key = self.get_redis_key(region, key) - self.client.delete(redis_key) + try: + redis_key = self.get_redis_key(region, key) + self.client.delete(redis_key) + except Exception as e: + logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}") def clear(self, region: Optional[str] = None) -> None: """ - 清除 Redis 中指定区域的缓存或全部缓存 + 清除指定区域的缓存或全部缓存 :param region: 缓存的区 """ - if region: - # 清除指定区域的所有键 - pattern = f"{region}:*" - keys = list(self.client.keys(pattern)) - if keys: - self.client.delete(*keys) - else: - # 清除所有缓存 - self.client.flushdb() + try: + if region: + cache_region = self.get_region(quote(region)) + redis_key = f"{cache_region}:key:*" + # self.client.delete(*self.client.keys(redis_key)) + with self.client.pipeline() as pipe: + for key in self.client.scan_iter(redis_key): + pipe.delete(key) + pipe.execute() + logger.info(f"Cleared Redis cache for region: {region}") + else: + self.client.flushdb() + logger.info("Cleared all Redis cache") + except Exception as e: + logger.error(f"Failed to clear cache, region: {region}, error: {e}") + + def close(self) -> None: + """ + 关闭 Redis 客户端的连接池 + """ + if self.client: + self.client.close() def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend: @@ -245,10 +363,23 @@ def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend: :param ttl: 缓存的默认存活时间,单位秒 :return: 返回缓存后端实例 """ - cache_type = os.getenv("CACHE_TYPE", "cachetools").lower() + cache_type = settings.CACHE_BACKEND_TYPE + logger.debug(f"Cache backend type from settings: {cache_type}") if cache_type == "redis": - return RedisBackend(redis_url=os.getenv("REDIS_URL", "redis://localhost")) + redis_url = settings.CACHE_BACKEND_URL + if redis_url: + try: + logger.debug(f"Attempting to use RedisBackend with URL: {redis_url}, TTL: {ttl}") + return RedisBackend(redis_url=redis_url, ttl=ttl) + except RuntimeError: + logger.warning("Falling back to CacheToolsBackend due to Redis connection failure.") + else: + logger.debug("Cache backend type is redis, but no valid REDIS_URL found. " + "Falling back to CacheToolsBackend.") + + # 如果不是 Redis,回退到内存缓存 + logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}") return CacheToolsBackend(maxsize=maxsize, ttl=ttl) @@ -330,7 +461,6 @@ def cached(region: Optional[str] = None, maxsize: int = 1000, ttl: int = 1800, """ # 清理缓存区 cache_backend.clear(region=cache_region) - print(f"{cache_region} region cache is cleared") wrapper.cache_region = cache_region wrapper.cache_clear = cache_clear @@ -341,3 +471,15 @@ def cached(region: Optional[str] = None, maxsize: int = 1000, ttl: int = 1800, # 缓存后端实例 cache_backend = get_cache_backend() + + +def close_cache() -> None: + """ + 关闭缓存后端连接并清理资源 + """ + try: + if cache_backend: + cache_backend.close() + logger.info("Cache backend closed successfully.") + except Exception as e: + logger.info(f"Error while closing cache backend: {e}") diff --git a/app/core/config.py b/app/core/config.py index babe3408..adf30688 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -71,6 +71,12 @@ class ConfigModel(BaseModel): DB_TIMEOUT: int = 60 # SQLite 是否启用 WAL 模式,默认关闭 DB_WAL_ENABLE: bool = False + # 缓存类型,支持 cachetools 和 redis,默认使用 cachetools + CACHE_BACKEND_TYPE: str = "cachetools" + # 缓存连接字符串,仅外部缓存(如 Redis、Memcached)需要 + CACHE_BACKEND_URL: Optional[str] = None + # Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb" + CACHE_REDIS_MAXMEMORY: Optional[str] = None # 配置文件目录 CONFIG_DIR: Optional[str] = None # 超级管理员 @@ -351,7 +357,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel): return default, True @validator('*', pre=True, always=True) - def generic_type_validator(cls, value: Any, field): # noqa + def generic_type_validator(cls, value: Any, field): # noqa """ 通用校验器,尝试将配置值转换为期望的类型 """ diff --git a/app/scheduler.py b/app/scheduler.py index 3b1fbeae..5e118d0b 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -15,7 +15,6 @@ from app.chain.recommend import RecommendChain from app.chain.site import SiteChain from app.chain.subscribe import SubscribeChain from app.chain.tmdb import TmdbChain -from app.chain.torrents import TorrentsChain from app.chain.transfer import TransferChain from app.core.config import settings from app.core.event import EventManager diff --git a/app/startup/modules_initializer.py b/app/startup/modules_initializer.py index 66e59733..96a845f1 100644 --- a/app/startup/modules_initializer.py +++ b/app/startup/modules_initializer.py @@ -2,6 +2,7 @@ import sys from fastapi import FastAPI +from app.core.cache import close_cache from app.core.config import global_vars, settings from app.core.module import ModuleManager from app.log import logger @@ -129,6 +130,8 @@ def shutdown_modules(_: FastAPI): Monitor().stop() # 停止线程池 ThreadHelper().shutdown() + # 停止缓存连接 + close_cache() # 停止数据库连接 close_database() # 停止前端服务