Merge pull request #3755 from InfinityPacer/feature/cache

This commit is contained in:
jxxghp
2025-01-19 12:56:56 +08:00
committed by GitHub
4 changed files with 178 additions and 28 deletions

View File

@@ -1,13 +1,18 @@
import inspect
import os
import json
import pickle
from abc import ABC, abstractmethod
from functools import wraps
from typing import Any, Dict, Optional
from urllib.parse import quote
import redis
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.core.config import settings
from app.log import logger
# 默认缓存区
DEFAULT_CACHE_REGION = "DEFAULT"
@@ -60,6 +65,13 @@ class CacheBackend(ABC):
"""
pass
@abstractmethod
def close(self) -> None:
"""
关闭缓存连接
"""
pass
@staticmethod
def get_region(region: str = DEFAULT_CACHE_REGION):
"""
@@ -154,15 +166,32 @@ class CacheToolsBackend(CacheBackend):
region_cache = self.__get_region_cache(region)
if region_cache:
region_cache.clear()
logger.info(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
region_cache.clear()
logger.info("Cleared all cache")
def close(self) -> None:
"""
内存缓存不需要关闭资源
"""
pass
class RedisBackend(CacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
特性:
- 支持动态设置缓存的 TTLTime To Live存活时间
- 支持分区域region管理缓存不同的 region 采用独立的命名空间
- 支持自定义最大内存限制maxmemory和内存淘汰策略如 allkeys-lru
限制:
- 由于 Redis 的分布式特性,写入和读取可能受到网络延迟的影响
- Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存
"""
def __init__(self, redis_url: str = "redis://localhost", ttl: int = 1800):
@@ -174,15 +203,76 @@ class RedisBackend(CacheBackend):
"""
self.redis_url = redis_url
self.ttl = ttl
self.client = redis.StrictRedis.from_url(redis_url)
try:
self.client = redis.Redis.from_url(
redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接,确保 Redis 可用
self.client.ping()
logger.debug(f"Successfully connected to Redis")
self.set_memory_limit()
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise RuntimeError("Redis connection failed") from e
def set_memory_limit(self, policy: str = "allkeys-lru"):
"""
动态设置 Redis 最大内存和内存淘汰策略
:param policy: 淘汰策略(如 'allkeys-lru'
"""
try:
# 如果有显式值,则直接使用,为 0 时说明不限制,如果未配置,开启 BIG_MEMORY_MODE 时为 "1024mb",未开启时为 "256mb"
maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb")
self.client.config_set("maxmemory", maxmemory)
self.client.config_set("maxmemory-policy", policy)
logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}")
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def get_redis_key(region, key):
def serialize(value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
try:
# 尝试 JSON 序列化
return b"JSON" + b"\x00" + json.dumps(value).encode("utf-8")
except TypeError:
# 如果 JSON 序列化失败,使用 Pickle 序列化
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@staticmethod
def deserialize(value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
# @staticmethod
# def serialize(value: Any) -> bytes:
# return msgpack.packb(value, use_bin_type=True)
#
# @staticmethod
# def deserialize(value: bytes) -> Any:
# return msgpack.unpackb(value, raw=False)
def get_redis_key(self, region: str, key: str) -> str:
"""
获取缓存 Key
"""
# 使用 region 作为缓存键的一部分
return f"region:{region}:key:{key}"
region = self.get_region(quote(region))
return f"{region}:key:{quote(key)}"
def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
@@ -194,11 +284,17 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
:param kwargs: kwargs
"""
ttl = ttl or self.ttl
redis_key = self.get_redis_key(region, key)
self.client.setex(redis_key, ttl, value)
try:
ttl = ttl or self.ttl
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
kwargs.pop("maxsize", None)
self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
logger.error(f"Failed to set key: {key} in region: {region}, error: {e}")
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
获取缓存的值
@@ -206,9 +302,15 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
return value
try:
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return self.deserialize(value) # noqa
return None
except Exception as e:
logger.error(f"Failed to get key: {key} in region: {region}, error: {e}")
return None
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
@@ -217,24 +319,40 @@ class RedisBackend(CacheBackend):
:param key: 缓存的键
:param region: 缓存的区
"""
redis_key = self.get_redis_key(region, key)
self.client.delete(redis_key)
try:
redis_key = self.get_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}")
def clear(self, region: Optional[str] = None) -> None:
"""
清除 Redis 中指定区域的缓存或全部缓存
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
if region:
# 清除指定区域的所有键
pattern = f"{region}:*"
keys = list(self.client.keys(pattern))
if keys:
self.client.delete(*keys)
else:
# 清除所有缓存
self.client.flushdb()
try:
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
# self.client.delete(*self.client.keys(redis_key))
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
pipe.delete(key)
pipe.execute()
logger.info(f"Cleared Redis cache for region: {region}")
else:
self.client.flushdb()
logger.info("Cleared all Redis cache")
except Exception as e:
logger.error(f"Failed to clear cache, region: {region}, error: {e}")
def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
if self.client:
self.client.close()
def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend:
@@ -245,10 +363,23 @@ def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend:
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回缓存后端实例
"""
cache_type = os.getenv("CACHE_TYPE", "cachetools").lower()
cache_type = settings.CACHE_BACKEND_TYPE
logger.debug(f"Cache backend type from settings: {cache_type}")
if cache_type == "redis":
return RedisBackend(redis_url=os.getenv("REDIS_URL", "redis://localhost"))
redis_url = settings.CACHE_BACKEND_URL
if redis_url:
try:
logger.debug(f"Attempting to use RedisBackend with URL: {redis_url}, TTL: {ttl}")
return RedisBackend(redis_url=redis_url, ttl=ttl)
except RuntimeError:
logger.warning("Falling back to CacheToolsBackend due to Redis connection failure.")
else:
logger.debug("Cache backend type is redis, but no valid REDIS_URL found. "
"Falling back to CacheToolsBackend.")
# 如果不是 Redis回退到内存缓存
logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}")
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
@@ -330,7 +461,6 @@ def cached(region: Optional[str] = None, maxsize: int = 1000, ttl: int = 1800,
"""
# 清理缓存区
cache_backend.clear(region=cache_region)
print(f"{cache_region} region cache is cleared")
wrapper.cache_region = cache_region
wrapper.cache_clear = cache_clear
@@ -341,3 +471,15 @@ def cached(region: Optional[str] = None, maxsize: int = 1000, ttl: int = 1800,
# 缓存后端实例
cache_backend = get_cache_backend()
def close_cache() -> None:
"""
关闭缓存后端连接并清理资源
"""
try:
if cache_backend:
cache_backend.close()
logger.info("Cache backend closed successfully.")
except Exception as e:
logger.info(f"Error while closing cache backend: {e}")

View File

@@ -71,6 +71,12 @@ class ConfigModel(BaseModel):
DB_TIMEOUT: int = 60
# SQLite 是否启用 WAL 模式,默认关闭
DB_WAL_ENABLE: bool = False
# 缓存类型,支持 cachetools 和 redis默认使用 cachetools
CACHE_BACKEND_TYPE: str = "cachetools"
# 缓存连接字符串,仅外部缓存(如 Redis、Memcached需要
CACHE_BACKEND_URL: Optional[str] = None
# Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb"
CACHE_REDIS_MAXMEMORY: Optional[str] = None
# 配置文件目录
CONFIG_DIR: Optional[str] = None
# 超级管理员
@@ -351,7 +357,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
return default, True
@validator('*', pre=True, always=True)
def generic_type_validator(cls, value: Any, field): # noqa
def generic_type_validator(cls, value: Any, field): # noqa
"""
通用校验器,尝试将配置值转换为期望的类型
"""

View File

@@ -15,7 +15,6 @@ from app.chain.recommend import RecommendChain
from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.tmdb import TmdbChain
from app.chain.torrents import TorrentsChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.event import EventManager

View File

@@ -2,6 +2,7 @@ import sys
from fastapi import FastAPI
from app.core.cache import close_cache
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.log import logger
@@ -129,6 +130,8 @@ def shutdown_modules(_: FastAPI):
Monitor().stop()
# 停止线程池
ThreadHelper().shutdown()
# 停止缓存连接
close_cache()
# 停止数据库连接
close_database()
# 停止前端服务