重构Redis缓存机制

This commit is contained in:
jxxghp
2025-08-20 08:51:03 +08:00
parent 5eb2dec32d
commit 89e8a64734
4 changed files with 140 additions and 191 deletions

View File

@@ -7,10 +7,9 @@ from collections.abc import Callable
from pathlib import Path
from typing import Optional, Any, Tuple, List, Set, Union, Dict
from fastapi.concurrency import run_in_threadpool
import aiofiles
from anyio import Path as AsyncPath
from fastapi.concurrency import run_in_threadpool
from qbittorrentapi import TorrentFilesList
from transmission_rpc import File
@@ -23,6 +22,7 @@ from app.core.plugin import PluginManager
from app.db.message_oper import MessageOper
from app.db.user_oper import UserOper
from app.helper.message import MessageHelper, MessageQueueManager, MessageTemplateHelper
from app.helper.redis import RedisHelper
from app.helper.service import ServiceConfigHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
@@ -48,12 +48,29 @@ class ChainBase(metaclass=ABCMeta):
send_callback=self.run_module
)
self.pluginmanager = PluginManager()
# 初始化Redis缓存助手
self._redis_helper = None
if settings.CACHE_BACKEND_TYPE == "redis":
try:
self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL)
except RuntimeError as e:
logger.warning(f"Redis缓存初始化失败将使用本地缓存: {e}")
@staticmethod
def load_cache(filename: str) -> Any:
def load_cache(self, filename: str) -> Any:
"""
从本地加载缓存
加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
"""
# 如果Redis可用优先从Redis读取
if self._redis_helper:
try:
cache_data = self._redis_helper.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis加载缓存: {filename}")
return cache_data
except Exception as e:
logger.warning(f"从Redis加载缓存 {filename} 失败: {e}")
# 从本地文件读取(兼容存量数据)
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
@@ -63,11 +80,21 @@ class ChainBase(metaclass=ABCMeta):
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
return None
@staticmethod
async def async_load_cache(filename: str) -> Any:
async def async_load_cache(self, filename: str) -> Any:
"""
异步从本地加载缓存
异步加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
"""
# 如果Redis可用优先从Redis读取
if self._redis_helper:
try:
cache_data = self._redis_helper.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis异步加载缓存: {filename}")
return cache_data
except Exception as e:
logger.warning(f"从Redis异步加载缓存 {filename} 失败: {e}")
# 从本地文件读取(兼容存量数据)
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
try:
@@ -75,51 +102,88 @@ class ChainBase(metaclass=ABCMeta):
content = await f.read()
return pickle.loads(content)
except Exception as err:
logger.error(f"加载缓存 {filename} 出错:{str(err)}")
logger.error(f"异步加载缓存 {filename} 出错:{str(err)}")
return None
@staticmethod
async def async_save_cache(cache: Any, filename: str) -> None:
async def async_save_cache(self, cache: Any, filename: str) -> None:
"""
异步保存缓存到本地
异步保存缓存优先保存到Redis同时保存到本地作为备份
"""
try:
async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f:
await f.write(pickle.dumps(cache))
except Exception as err:
logger.error(f"保存缓存 {filename} 出错:{str(err)}")
# 如果Redis可用优先保存到Redis
if self._redis_helper:
try:
self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache")
logger.debug(f"异步保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"异步保存缓存到Redis失败: {e}")
else:
# 保存到本地
try:
async with aiofiles.open(settings.TEMP_PATH / filename, 'wb') as f:
await f.write(pickle.dumps(cache))
except Exception as err:
logger.error(f"异步保存缓存到本地 {filename} 出错:{str(err)}")
@staticmethod
def save_cache(cache: Any, filename: str) -> None:
def save_cache(self, cache: Any, filename: str) -> None:
"""
保存缓存到本地
保存缓存优先保存到Redis同时保存到本地作为备份
"""
try:
with open(settings.TEMP_PATH / filename, 'wb') as f:
pickle.dump(cache, f) # noqa
except Exception as err:
logger.error(f"保存缓存 {filename} 出错:{str(err)}")
# 如果Redis可用优先保存到Redis
if self._redis_helper:
try:
self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache")
logger.debug(f"保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"保存缓存到Redis失败: {e}")
else:
# 保存到本地
try:
with open(settings.TEMP_PATH / filename, 'wb') as f:
pickle.dump(cache, f) # noqa
except Exception as err:
logger.error(f"保存缓存到本地 {filename} 出错:{str(err)}")
@staticmethod
def remove_cache(filename: str) -> None:
def remove_cache(self, filename: str) -> None:
"""
删除本地缓存
删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._redis_helper:
try:
self._redis_helper.delete(filename, region="chain_cache")
logger.debug(f"删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"删除Redis缓存失败: {e}")
# 删除本地缓存
cache_path = settings.TEMP_PATH / filename
if cache_path.exists():
cache_path.unlink()
try:
cache_path.unlink()
logger.debug(f"删除本地缓存: {filename}")
except Exception as e:
logger.warning(f"删除本地缓存失败: {e}")
@staticmethod
async def async_remove_cache(filename: str) -> None:
async def async_remove_cache(self, filename: str) -> None:
"""
异步删除本地缓存
异步删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._redis_helper:
try:
self._redis_helper.delete(filename, region="chain_cache")
logger.debug(f"异步删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"异步删除Redis缓存失败: {e}")
# 删除本地缓存
cache_path = AsyncPath(settings.TEMP_PATH) / filename
if await cache_path.exists():
try:
await cache_path.unlink()
logger.debug(f"异步删除本地缓存: {filename}")
except Exception as err:
logger.error(f"异步删除缓存 {filename} 出错:{str(err)}")
logger.error(f"异步删除本地缓存 {filename} 出错:{str(err)}")
@staticmethod
def __is_valid_empty(ret):
@@ -923,12 +987,12 @@ class ChainBase(metaclass=ABCMeta):
immediately=True if message.userid else False)
async def async_post_message(self,
message: Optional[Notification] = None,
meta: Optional[MetaBase] = None,
mediainfo: Optional[MediaInfo] = None,
torrentinfo: Optional[TorrentInfo] = None,
transferinfo: Optional[TransferInfo] = None,
**kwargs) -> None:
message: Optional[Notification] = None,
meta: Optional[MetaBase] = None,
mediainfo: Optional[MediaInfo] = None,
torrentinfo: Optional[TorrentInfo] = None,
transferinfo: Optional[TransferInfo] = None,
**kwargs) -> None:
"""
异步发送消息
:param message: Notification实例
@@ -991,15 +1055,16 @@ class ChainBase(metaclass=ABCMeta):
break
# 按设定发送
await self.eventmanager.async_send_event(etype=EventType.NoticeMessage,
data={**send_message.dict(), "type": send_message.mtype})
data={**send_message.dict(), "type": send_message.mtype})
await self.messagequeue.async_send_message("post_message", message=send_message)
if not send_orignal:
return
# 发送消息事件
await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype})
await self.eventmanager.async_send_event(etype=EventType.NoticeMessage,
data={**message.dict(), "type": message.mtype})
# 按原消息发送
await self.messagequeue.async_send_message("post_message", message=message,
immediately=True if message.userid else False)
immediately=True if message.userid else False)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""

View File

@@ -1,17 +1,14 @@
import inspect
import json
import pickle
import threading
from abc import ABC, abstractmethod
from functools import wraps
from typing import Any, Dict, Optional
from urllib.parse import quote
import redis
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.core.config import settings
from app.helper.redis import RedisHelper
from app.log import logger
# 默认缓存区
@@ -250,10 +247,6 @@ class RedisBackend(CacheBackend):
- Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存
"""
# 类型缓存集合,针对非容器简单类型
_complex_serializable_types = set()
_simple_serializable_types = set()
def __init__(self, redis_url: Optional[str] = "redis://localhost", ttl: Optional[int] = 1800):
"""
初始化 Redis 缓存实例
@@ -261,101 +254,13 @@ class RedisBackend(CacheBackend):
:param redis_url: Redis 服务的 URL
:param ttl: 缓存的存活时间,单位秒
"""
self.redis_url = redis_url
self.ttl = ttl
self._redis_helper = None
try:
self.client = redis.Redis.from_url(
redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接,确保 Redis 可用
self.client.ping()
logger.debug(f"Successfully connected to Redis")
self.set_memory_limit()
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise RuntimeError("Redis connection failed") from e
def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
"""
动态设置 Redis 最大内存和内存淘汰策略
:param policy: 淘汰策略(如 'allkeys-lru'
"""
try:
# 如果有显式值,则直接使用,为 0 时说明不限制,如果未配置,开启 BIG_MEMORY_MODE 时为 "1024mb",未开启时为 "256mb"
maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb")
self.client.config_set("maxmemory", maxmemory)
self.client.config_set("maxmemory-policy", policy)
logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}")
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def is_container_type(t):
return t in (list, dict, tuple, set)
@classmethod
def serialize(cls, value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
vt = type(value)
# 针对非容器类型使用缓存策略
if not cls.is_container_type(vt):
# 如果已知需要复杂序列化
if vt in cls._complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in cls._simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
cls._simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
cls._complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 针对容器类型,每次尝试简单序列化,不使用缓存
else:
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@classmethod
def deserialize(cls, value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
# @staticmethod
# def serialize(value: Any) -> bytes:
# return msgpack.packb(value, use_bin_type=True)
#
# @staticmethod
# def deserialize(value: bytes) -> Any:
# return msgpack.unpackb(value, raw=False)
def get_redis_key(self, region: str, key: str) -> str:
"""
获取缓存 Key
"""
# 使用 region 作为缓存键的一部分
region = self.get_region(quote(region))
return f"{region}:key:{quote(key)}"
self.redis_helper = RedisHelper(redis_url=redis_url)
except RuntimeError as e:
logger.warning(f"Redis缓存初始化失败: {e}")
raise e
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
@@ -368,15 +273,8 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
:param kwargs: kwargs
"""
try:
ttl = ttl or self.ttl
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
kwargs.pop("maxsize", None)
self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
logger.error(f"Failed to set key: {key} in region: {region}, error: {e}")
ttl = ttl or self.ttl
self.redis_helper.set(key, value, ttl=ttl, region=region, **kwargs)
def exists(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> bool:
"""
@@ -386,12 +284,7 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
try:
redis_key = self.get_redis_key(region, key)
return self.client.exists(redis_key) == 1
except Exception as e:
logger.error(f"Failed to exists key: {key} region: {region}, error: {e}")
return False
return self.redis_helper.exists(key, region=region)
def get(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
@@ -401,15 +294,7 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
try:
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return self.deserialize(value) # noqa
return None
except Exception as e:
logger.error(f"Failed to get key: {key} in region: {region}, error: {e}")
return None
return self.redis_helper.get(key, region=region)
def delete(self, key: str, region: Optional[str] = DEFAULT_CACHE_REGION) -> None:
"""
@@ -418,11 +303,7 @@ class RedisBackend(CacheBackend):
:param key: 缓存的键
:param region: 缓存的区
"""
try:
redis_key = self.get_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}")
self.redis_helper.delete(key, region=region)
def clear(self, region: Optional[str] = None) -> None:
"""
@@ -430,28 +311,13 @@ class RedisBackend(CacheBackend):
:param region: 缓存的区
"""
try:
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
# self.client.delete(*self.client.keys(redis_key))
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
pipe.delete(key)
pipe.execute()
logger.info(f"Cleared Redis cache for region: {region}")
else:
self.client.flushdb()
logger.info("Cleared all Redis cache")
except Exception as e:
logger.error(f"Failed to clear cache, region: {region}, error: {e}")
self.redis_helper.clear(region=region)
def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
if self.client:
self.client.close()
self.redis_helper.close()
def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) -> CacheBackend:

View File

@@ -136,7 +136,7 @@ class ConfigModel(BaseModel):
# 缓存类型,支持 cachetools 和 redis默认使用 cachetools
CACHE_BACKEND_TYPE: str = "cachetools"
# 缓存连接字符串,仅外部缓存(如 Redis、Memcached需要
CACHE_BACKEND_URL: Optional[str] = None
CACHE_BACKEND_URL: Optional[str] = "redis://localhost:6379"
# Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb"
CACHE_REDIS_MAXMEMORY: Optional[str] = None

View File

@@ -6,10 +6,14 @@ from urllib.parse import quote
import redis
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import SingletonClass
class RedisHelper:
class RedisHelper(metaclass=SingletonClass):
"""
Redis连接和操作助手类
@@ -54,6 +58,20 @@ class RedisHelper:
logger.error(f"Failed to connect to Redis: {e}")
raise RuntimeError("Redis connection failed") from e
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件更新Redis设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']:
return
logger.info("配置变更重连Redis...")
self._connect()
def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
"""
动态设置Redis最大内存和内存淘汰策略